Skip to content

Commit

Permalink
Fix schedule_to_close not triggering after retry issue (#6545)
Browse files Browse the repository at this point in the history
## What changed?
1. Add "FirstScheduledTime" to ActivityInfo proto
2. Populate it. Logic - it is assigned only once, for the first time
activity is scheduled. ScheduleTime for activity is changed every time
activity is retried.
3. Change timer creation logic to use FirstScheduledTime for "schedule
to close' activity timer.

## Why?
"Schedule to close", according to our documentation, should cover the
whole lifespan of activity, including all retryes. That was not the case
- after every retry we move activityInfo.ScheduleTime forward, thus
effectivly not respecting the timeout.

## How did you test it?
1. Add more unit tests
2. Add functional tests to specifically cover this scenario.

## Potential risks
Someone is relying on the existing functionality.

## Is hotfix candidate?
It was like this for a long time, there is no need for hotfix.
  • Loading branch information
ychebotarev authored Sep 30, 2024
1 parent 28d4815 commit fc70bd9
Show file tree
Hide file tree
Showing 10 changed files with 867 additions and 611 deletions.
1,217 changes: 616 additions & 601 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,9 @@ message ActivityInfo {
// workflows redirect_counter value when this activity started last time
int64 last_redirect_counter = 2;
}

// fist time activity was schedulled.
google.protobuf.Timestamp first_scheduled_time = 39;
}

// timer_map column
Expand Down
3 changes: 2 additions & 1 deletion service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,9 @@ func (r *workflowResetterImpl) failInflightActivity(
switch ai.StartedEventId {
case common.EmptyEventID:
// activity not started, noop
// override the activity time to now
// override the scheduled activity time to now
ai.ScheduledTime = timestamppb.New(now)
ai.FirstScheduledTime = timestamppb.New(now)
if err := mutableState.UpdateActivity(ai); err != nil {
return err
}
Expand Down
19 changes: 11 additions & 8 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,15 +461,17 @@ func (s *workflowResetterSuite) TestFailInflightActivity() {
Version: 12,
ScheduledEventId: 123,
ScheduledTime: timestamppb.New(now.Add(-10 * time.Second)),
FirstScheduledTime: timestamppb.New(now.Add(-10 * time.Second)),
StartedEventId: 124,
LastHeartbeatDetails: payloads.EncodeString("some random activity 1 details"),
StartedIdentity: "some random activity 1 started identity",
}
activity2 := &persistencespb.ActivityInfo{
Version: 12,
ScheduledEventId: 456,
ScheduledTime: timestamppb.New(now.Add(-10 * time.Second)),
StartedEventId: common.EmptyEventID,
Version: 12,
ScheduledEventId: 456,
ScheduledTime: timestamppb.New(now.Add(-10 * time.Second)),
FirstScheduledTime: timestamppb.New(now.Add(-10 * time.Second)),
StartedEventId: common.EmptyEventID,
}
mutableState.EXPECT().GetPendingActivityInfos().Return(map[int64]*persistencespb.ActivityInfo{
activity1.ScheduledEventId: activity1,
Expand All @@ -486,10 +488,11 @@ func (s *workflowResetterSuite) TestFailInflightActivity() {
).Return(&historypb.HistoryEvent{}, nil)

mutableState.EXPECT().UpdateActivity(&persistencespb.ActivityInfo{
Version: activity2.Version,
ScheduledEventId: activity2.ScheduledEventId,
ScheduledTime: timestamppb.New(now),
StartedEventId: activity2.StartedEventId,
Version: activity2.Version,
ScheduledEventId: activity2.ScheduledEventId,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: activity2.StartedEventId,
}).Return(nil)

err := s.workflowResetter.failInflightActivity(now, mutableState, terminateReason)
Expand Down
1 change: 1 addition & 0 deletions service/history/workflow/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,6 @@ func updateActivityInfoForRetries(
ai.TimerTaskStatus = TimerTaskStatusNone
ai.RetryLastWorkerIdentity = ai.StartedIdentity
ai.RetryLastFailure = failure

return ai
}
3 changes: 3 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,7 @@ func (ms *MutableStateImpl) UpdateActivityInfo(

ai.Version = incomingActivityInfo.GetVersion()
ai.ScheduledTime = incomingActivityInfo.GetScheduledTime()
// we don't need to update FirstScheduledTime
ai.StartedEventId = incomingActivityInfo.GetStartedEventId()
ai.LastHeartbeatUpdateTime = incomingActivityInfo.GetLastHeartbeatTime()
if ai.StartedEventId == common.EmptyEventID {
Expand Down Expand Up @@ -2853,6 +2854,7 @@ func (ms *MutableStateImpl) ApplyActivityTaskScheduledEvent(
firstEventID int64,
event *historypb.HistoryEvent,
) (*persistencespb.ActivityInfo, error) {

attributes := event.GetActivityTaskScheduledEventAttributes()

scheduledEventID := event.GetEventId()
Expand All @@ -2863,6 +2865,7 @@ func (ms *MutableStateImpl) ApplyActivityTaskScheduledEvent(
ScheduledEventId: scheduledEventID,
ScheduledEventBatchId: firstEventID,
ScheduledTime: event.GetEventTime(),
FirstScheduledTime: event.GetEventTime(),
StartedEventId: common.EmptyEventID,
StartedTime: nil,
ActivityId: attributes.ActivityId,
Expand Down
9 changes: 8 additions & 1 deletion service/history/workflow/timer_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,14 @@ func (t *timerSequenceImpl) getActivityScheduleToCloseTimeout(
return nil
}

timeoutTime := timestamp.TimeValue(activityInfo.ScheduledTime).Add(scheduleToCloseDuration)
var timeoutTime time.Time
// for backward compatibility. FirstScheduledTime can be null if mutable state was
// restored from the version before this field was introduce
if activityInfo.FirstScheduledTime != nil {
timeoutTime = timestamp.TimeValue(activityInfo.FirstScheduledTime).Add(scheduleToCloseDuration)
} else {
timeoutTime = timestamp.TimeValue(activityInfo.ScheduledTime).Add(scheduleToCloseDuration)
}

return &TimerSequenceID{
EventID: activityInfo.ScheduledEventId,
Expand Down
49 changes: 49 additions & 0 deletions service/history/workflow/timer_sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_NotCreated_AfterWorkflo
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: common.EmptyEventID,
StartedTime: nil,
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -353,6 +354,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_NotCreated_BeforeWorkfl
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: common.EmptyEventID,
StartedTime: nil,
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -394,6 +396,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_NotCreated_NoWorkflowEx
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: common.EmptyEventID,
StartedTime: nil,
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -435,6 +438,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_HeartbeatTimer_AfterWor
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -463,6 +467,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_HeartbeatTimer_BeforeWo
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -506,6 +511,7 @@ func (s *timerSequenceSuite) TestCreateNextActivityTimer_HeartbeatTimer_NoWorkfl
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -653,6 +659,7 @@ func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_One_Scheduled_NotStar
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: common.EmptyEventID,
StartedTime: nil,
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -692,6 +699,7 @@ func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_One_Scheduled_Started
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -738,6 +746,7 @@ func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_One_Scheduled_Started
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -777,6 +786,7 @@ func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_One_Scheduled_Started
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -823,6 +833,7 @@ func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_One_Scheduled_Started
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -862,6 +873,7 @@ func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_Multiple() {
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: 345,
StartedTime: timestamppb.New(now.Add(200 * time.Millisecond)),
ActivityId: "some random activity ID",
Expand All @@ -877,6 +889,7 @@ func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_Multiple() {
Version: 123,
ScheduledEventId: 2345,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: common.EmptyEventID,
StartedTime: nil,
ActivityId: "other random activity ID",
Expand Down Expand Up @@ -1140,6 +1153,7 @@ func (s *timerSequenceSuite) TestGetActivityScheduleToCloseTimeout_WithTimeout_S
Version: 123,
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
FirstScheduledTime: timestamppb.New(now),
StartedEventId: common.EmptyEventID,
StartedTime: nil,
ActivityId: "some random activity ID",
Expand Down Expand Up @@ -1570,3 +1584,38 @@ func (s *timerSequenceSuite) TestLess_CompareType() {
s.True(timerSequenceIDs.Less(0, 1))
s.False(timerSequenceIDs.Less(1, 0))
}

func (s *timerSequenceSuite) TestLoadAndSortActivityTimers_FirstScheduledTime() {
now := time.Now().UTC()
activityInfo := &persistencespb.ActivityInfo{
ScheduledEventId: 234,
ScheduledTime: timestamppb.New(now),
ScheduleToStartTimeout: timestamp.DurationFromSeconds(10),
ScheduleToCloseTimeout: timestamp.DurationFromSeconds(1000),
StartToCloseTimeout: timestamp.DurationFromSeconds(100),
HeartbeatTimeout: timestamp.DurationFromSeconds(1),
TimerTaskStatus: TimerTaskStatusCreatedScheduleToClose | TimerTaskStatusCreatedScheduleToStart,
Attempt: 12,
}
activityInfo.FirstScheduledTime = timestamppb.New(now.Add(1 * time.Second))
activityInfos := map[int64]*persistencespb.ActivityInfo{activityInfo.ScheduledEventId: activityInfo}
s.mockMutableState.EXPECT().GetPendingActivityInfos().Return(activityInfos)

timerSequenceIDs := s.timerSequence.LoadAndSortActivityTimers()
s.Equal([]TimerSequenceID{
{
EventID: activityInfo.ScheduledEventId,
Timestamp: activityInfo.ScheduledTime.AsTime().Add(activityInfo.ScheduleToStartTimeout.AsDuration()),
TimerType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START,
TimerCreated: true,
Attempt: activityInfo.Attempt,
},
{
EventID: activityInfo.ScheduledEventId,
Timestamp: activityInfo.FirstScheduledTime.AsTime().Add(activityInfo.ScheduleToCloseTimeout.AsDuration()),
TimerType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
TimerCreated: true,
Attempt: activityInfo.Attempt,
},
}, timerSequenceIDs)
}
Loading

0 comments on commit fc70bd9

Please sign in to comment.