Skip to content

Commit

Permalink
State machine standby executor fixes (#6472) (#6474)
Browse files Browse the repository at this point in the history
## What changed?

Cherry picked #6472 on top of 120.
  • Loading branch information
bergundy authored Sep 3, 2024
1 parent 750ba01 commit 14a5bc5
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 116 deletions.
29 changes: 21 additions & 8 deletions components/dummy/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
)

const (
TaskTypeTimer = "dummy.Immediate"
TaskTypeImmediate = "dummy.Timer"
TaskTypeTimer = "dummy.Timer"
TaskTypeImmediate = "dummy.Immediate"
)

type ImmediateTask struct {
Expand Down Expand Up @@ -66,7 +66,8 @@ func (ImmediateTaskSerializer) Serialize(hsm.Task) ([]byte, error) {
}

type TimerTask struct {
Deadline time.Time
Deadline time.Time
concurrent bool
}

var _ hsm.Task = TimerTask{}
Expand All @@ -79,21 +80,33 @@ func (t TimerTask) Kind() hsm.TaskKind {
return hsm.TaskKindTimer{Deadline: t.Deadline}
}

func (TimerTask) Concurrent() bool {
return false
func (t TimerTask) Concurrent() bool {
return t.concurrent
}

func (t TimerTask) Validate(*hsm.Node) error {
// In case the task is considered concurrent, consider it valid for now.
return nil
}

type TimerTaskSerializer struct{}

func (TimerTaskSerializer) Deserialize(data []byte, kind hsm.TaskKind) (hsm.Task, error) {
if kind, ok := kind.(hsm.TaskKindTimer); ok {
return TimerTask{Deadline: kind.Deadline}, nil
return TimerTask{Deadline: kind.Deadline, concurrent: len(data) > 0}, nil
}
return nil, fmt.Errorf("%w: expected timer", hsm.ErrInvalidTaskKind)
}

func (TimerTaskSerializer) Serialize(hsm.Task) ([]byte, error) {
return nil, nil
func (s TimerTaskSerializer) Serialize(task hsm.Task) ([]byte, error) {
if tt, ok := task.(TimerTask); ok {
if tt.concurrent {
// Non empty data marks the task as concurrent.
return []byte{1}, nil
}
return nil, nil
}
return nil, fmt.Errorf("incompatible task: %v", task)
}

func RegisterTaskSerializers(reg *hsm.Registry) error {
Expand Down
9 changes: 0 additions & 9 deletions components/nexusoperations/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,6 @@ func (e taskExecutor) loadOperationArgs(
if err := task.Validate(node); err != nil {
return err
}
if err := node.CheckRunning(); err != nil {
return err
}
operation, err := hsm.MachineData[Operation](node)
if err != nil {
return err
Expand Down Expand Up @@ -459,9 +456,6 @@ func (e taskExecutor) executeBackoffTask(env hsm.Environment, node *hsm.Node, ta
if err := task.Validate(node); err != nil {
return err
}
if err := node.CheckRunning(); err != nil {
return err
}
return hsm.MachineTransition(node, func(op Operation) (hsm.TransitionOutput, error) {
return TransitionRescheduled.Apply(op, EventRescheduled{
Node: node,
Expand All @@ -473,9 +467,6 @@ func (e taskExecutor) executeTimeoutTask(env hsm.Environment, node *hsm.Node, ta
if err := task.Validate(node); err != nil {
return err
}
if err := node.CheckRunning(); err != nil {
return err
}
return hsm.MachineTransition(node, func(op Operation) (hsm.TransitionOutput, error) {
eventID, err := hsm.EventIDFromToken(op.ScheduledEventToken)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions components/nexusoperations/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (TimeoutTask) Concurrent() bool {

// Validate checks if the timeout task is still valid to execute for the given node state.
func (t TimeoutTask) Validate(node *hsm.Node) error {
if err := node.CheckRunning(); err != nil {
return err
}
op, err := hsm.MachineData[Operation](node)
if err != nil {
return err
Expand Down Expand Up @@ -106,6 +109,9 @@ func (InvocationTask) Concurrent() bool {
}

func (t InvocationTask) Validate(node *hsm.Node) error {
if err := node.CheckRunning(); err != nil {
return err
}
op, err := hsm.MachineData[Operation](node)
if err != nil {
return err
Expand Down Expand Up @@ -152,6 +158,9 @@ func (BackoffTask) Concurrent() bool {
}

func (t BackoffTask) Validate(node *hsm.Node) error {
if err := node.CheckRunning(); err != nil {
return err
}
op, err := hsm.MachineData[Operation](node)
if err != nil {
return err
Expand Down
14 changes: 0 additions & 14 deletions service/history/ndc_standby_task_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,6 @@ func standbyTimerTaskPostActionTaskDiscarded(
return consts.ErrTaskDiscarded
}

func standbyOutboundTaskPostActionTaskDiscarded(
_ context.Context,
taskInfo tasks.Task,
postActionInfo interface{},
logger log.Logger,
) error {
if postActionInfo == nil {
return nil
}

logger.Warn("Discarding standby outbound task due to task being pending for too long.", tag.Task(taskInfo))
return consts.ErrTaskDiscarded
}

type (
historyResendInfo struct {

Expand Down
138 changes: 57 additions & 81 deletions service/history/outbound_queue_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"errors"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/configs"
Expand Down Expand Up @@ -87,90 +88,13 @@ func (e *outboundQueueStandbyTaskExecutor) Execute(
}
}

nsName, err := e.shardContext.GetNamespaceRegistry().GetNamespaceName(
namespace.ID(task.GetNamespaceID()),
)
if err != nil {
return respond(err)
}

ref, smt, err := stateMachineTask(e.shardContext, task)
if err != nil {
return respond(err)
}

if err := validateTaskByClock(e.shardContext, task); err != nil {
return respond(err)
}

destination := ""
if dtask, ok := task.(tasks.HasDestination); ok {
destination = dtask.GetDestination()
}

actionFn := func(ctx context.Context) (any, error) {
err := e.Access(ctx, ref, hsm.AccessRead, func(node *hsm.Node) error {
if smt.Concurrent() {
//nolint:revive // concurrent tasks implements hsm.ConcurrentTask interface
concurrentSmt := smt.(hsm.ConcurrentTask)
return concurrentSmt.Validate(node)
}
return nil
})
if err != nil {
if errors.Is(err, consts.ErrStaleReference) {
// If the reference is stale, then the task was already executed in
// the active queue, and there is nothing to do here.
return nil, nil
}
return nil, err
}

// If there was no error from Access nor from the accessor function, then the task
// is still valid for processing based on the current state of the machine.
// The *likely* reasons are: a) delay in the replication stack; b) destination is down.
// In any case, the task needs to be retried.
var postActionInfoErr error = consts.ErrTaskRetry
if e.config.OutboundStandbyTaskMissingEventsDestinationDownErr(nsName.String(), destination) {
// Wrap the retry error with DestinationDownError so it can trigger the circuit breaker on
// the standby side. This won't do any harm, at most some delay processing the standby task.
// Assuming the dynamic config OutboundStandbyTaskMissingEventsDiscardDelay is long enough,
// it should give enough time for the active side to execute the task successfully, and the
// standby side to process it as well without discarding the task.
postActionInfoErr = queues.NewDestinationDownError(
"standby task executor returned retryable error",
postActionInfoErr,
)
}
return postActionInfoErr, nil
}

err = e.processTask(
ctx,
task,
actionFn,
getStandbyPostActionFn(
task,
e.Now,
0, // We don't need resend delay since we don't do fetch history.
e.config.OutboundStandbyTaskMissingEventsDiscardDelay(nsName.String(), destination),
// We don't need to fetch history from remote for state machine to sync.
// So, just use the noop post action which will return a retry error
// if the task didn't succeed.
standbyTaskPostActionNoOp,
standbyOutboundTaskPostActionTaskDiscarded,
),
)

return respond(err)
return respond(e.processTask(ctx, task))
}

func (e *outboundQueueStandbyTaskExecutor) processTask(
ctx context.Context,
task tasks.Task,
actionFn func(context.Context) (any, error),
postActionFn standbyPostActionFn,
) (retError error) {
) error {
ctx, cancel := context.WithTimeout(ctx, taskTimeout)
defer cancel()

Expand All @@ -180,15 +104,67 @@ func (e *outboundQueueStandbyTaskExecutor) processTask(
if err != nil {
return err
}

if !nsRecord.IsOnCluster(e.clusterName) {
// namespace is not replicated to local cluster, ignore corresponding tasks
return nil
}

historyResendInfo, err := actionFn(ctx)
if err := validateTaskByClock(e.shardContext, task); err != nil {
return err
}

ref, smt, err := stateMachineTask(e.shardContext, task)
if err != nil {
return err
}

err = e.Access(ctx, ref, hsm.AccessRead, func(node *hsm.Node) error {
if smt.Concurrent() {
//nolint:revive // concurrent tasks implements hsm.ConcurrentTask interface
concurrentSmt := smt.(hsm.ConcurrentTask)
return concurrentSmt.Validate(node)
}
return nil
})

if err != nil {
if errors.Is(err, consts.ErrStaleReference) {
// If the reference is stale, then the task was already executed in
// the active queue, and there is nothing to do here.
return nil
}
return err
}

return postActionFn(ctx, task, historyResendInfo, e.logger)
// If there was no error from Access nor from the accessor function, then the task
// is still valid for processing based on the current state of the machine.
// The *likely* reasons are: a) delay in the replication stack; b) destination is down.
// In any case, the task needs to be retried (or discarded, based on the configured discard delay).

destination := ""
if dtask, ok := task.(tasks.HasDestination); ok {
destination = dtask.GetDestination()
}

discardTime := task.GetVisibilityTime().Add(e.config.OutboundStandbyTaskMissingEventsDiscardDelay(nsRecord.Name().String(), destination))
// now > task start time + discard delay
if e.Now().After(discardTime) {
e.logger.Warn("Discarding standby outbound task due to task being pending for too long.", tag.Task(task))
return consts.ErrTaskDiscarded
}

err = consts.ErrTaskRetry
if e.config.OutboundStandbyTaskMissingEventsDestinationDownErr(nsRecord.Name().String(), destination) {
// Wrap the retry error with DestinationDownError so it can trigger the circuit breaker on
// the standby side. This won't do any harm, at most some delay processing the standby task.
// Assuming the dynamic config OutboundStandbyTaskMissingEventsDiscardDelay is long enough,
// it should give enough time for the active side to execute the task successfully, and the
// standby side to process it as well without discarding the task.
err = queues.NewDestinationDownError(
"standby task executor returned retryable error",
err,
)
}
return err
}
2 changes: 1 addition & 1 deletion service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (e *executableImpl) isSafeToDropError(err error) bool {
// Even though ErrStaleReference is castable to serviceerror.NotFound, we give this error special treatment
// because we're interested in the metric.
metrics.TaskSkipped.With(e.taggedMetricsHandler).Record(1)
e.logger.Info("Skipped task due with stale reference", tag.Error(err))
e.logger.Info("Skipped task due to stale reference", tag.Error(err))
return true
}

Expand Down
8 changes: 5 additions & 3 deletions service/history/timer_queue_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,10 +523,12 @@ func (t *timerQueueStandbyTaskExecutor) executeStateMachineTimerTask(
if task.Concurrent() {
//nolint:revive // concurrent tasks implements hsm.ConcurrentTask interface
concurrentTask := task.(hsm.ConcurrentTask)
return concurrentTask.Validate(node)
if err := concurrentTask.Validate(node); err != nil {
return err
}
}
// If the task is expired and still valid in the standby queue,
// then the state machine is stale.
// If the timer fired and the task is still valid in the standby queue, wait for the active cluster to
// transition and invalidate the task.
return consts.ErrTaskRetry
},
)
Expand Down
Loading

0 comments on commit 14a5bc5

Please sign in to comment.