Skip to content

Commit

Permalink
foreach: add foreach_strategy property to ensure sequence between f…
Browse files Browse the repository at this point in the history
…oreach elements (#211)

Each elements of a foreach loop are run in parallel.
`foreach_strategy` control the strategy in which the elements will be run
sequentially or in parallel.

Closes #197

Signed-off-by: Romain Beuque <[email protected]>
  • Loading branch information
rbeuque74 committed Jan 13, 2021
1 parent acc23c0 commit d8a0b5a
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 24 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,11 @@ This output can be then passed to another step in json format:
foreach: '{{.step.prefixStrings.children | toJson}}'
```

It's possible to configure the strategy used to run each elements: default strategy is `parallel`: each elements will be run in parallel to maximize throughput ; `sequence` will run each element when the previous one is done, to ensure the sequence between elements. It can be declared in the template as is:
```yaml
foreach_strategy: "sequence"
```

#### Resources <a name="resources"></a>

Resources are a way to restrict the concurrency factor of certain operations, to control the throughput and avoid dangerous behavior e.g. flooding the targets.
Expand Down
11 changes: 11 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,8 @@ func expandStep(s *step.Step, res *resolution.Resolution) {
s.Error = err.Error()
return
}

var previousChildStepName string
// generate all children steps
for i, item := range items {
childStepName := fmt.Sprintf("%s-%d", s.Name, i)
Expand All @@ -736,6 +738,15 @@ func expandStep(s *step.Step, res *resolution.Resolution) {
Resources: s.Resources,
Item: item,
}

if s.ForEachStrategy == step.ForEachStrategySequence {
if previousChildStepName != "" {
res.Steps[childStepName].Dependencies = append(res.Steps[childStepName].Dependencies, previousChildStepName)
}

previousChildStepName = childStepName
}

delete(res.ForeachChildrenAlreadyContracted, childStepName)
}
// update parent dependencies to wait on children
Expand Down
52 changes: 52 additions & 0 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ghodss/yaml"
"github.com/juju/errors"
"github.com/loopfz/gadgeto/zesty"
"github.com/maxatome/go-testdeep/td"
"github.com/ovh/configstore"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/ovh/utask/engine/functions"
functionrunner "github.com/ovh/utask/engine/functions/runner"
"github.com/ovh/utask/engine/step"
"github.com/ovh/utask/engine/step/condition"
"github.com/ovh/utask/engine/values"
"github.com/ovh/utask/models/resolution"
"github.com/ovh/utask/models/task"
Expand Down Expand Up @@ -644,6 +646,56 @@ func TestForeach(t *testing.T) {
assert.Equal(t, "foo-b-bar-b", firstItemOutput["concat"])
}

func TestForeachWithChainedIterations(t *testing.T) {
_, require := td.AssertRequire(t)
res, err := createResolution("foreach.yaml", map[string]interface{}{
"list": []interface{}{"a", "b", "c", "d", "e"},
}, nil)
require.Nil(err)
require.NotNil(res)

res.Steps["generateItems"].Conditions[0].Then["this"] = "DONE"
res.Steps["generateItems"].Conditions = append(
res.Steps["generateItems"].Conditions,
&condition.Condition{
Type: condition.CHECK,
If: []*condition.Assert{
{
Value: "{{.iterator}}",
Operator: condition.EQ,
Expected: "d",
},
},
Then: map[string]string{
"this": "SERVER_ERROR",
},
},
)
res.Steps["generateItems"].ForEachStrategy = "sequence"
err = updateResolution(res)
require.Nil(err)

res, err = runResolution(res)
require.NotNil(res)
require.Nil(err)
require.Cmp(res.State, resolution.StateError)

td.Cmp(t, res.Steps["emptyLoop"].State, step.StateDone) // running on empty collection is ok
td.Cmp(t, res.Steps["concatItems"].State, step.StateTODO)
td.Cmp(t, res.Steps["finalStep"].State, step.StateTODO)
td.Cmp(t, res.Steps["bStep"].State, "B")
td.Cmp(t, res.Steps["generateItems-0"].State, step.StateDone)
td.Cmp(t, res.Steps["generateItems-1"].State, step.StateDone)
td.Cmp(t, res.Steps["generateItems-2"].State, step.StateDone)
td.Cmp(t, res.Steps["generateItems-3"].State, step.StateServerError)
td.Cmp(t, res.Steps["generateItems-4"].State, step.StateTODO)
td.CmpLen(t, res.Steps["generateItems-0"].Dependencies, 0)
td.Cmp(t, res.Steps["generateItems-1"].Dependencies, []string{"generateItems-0"})
td.Cmp(t, res.Steps["generateItems-2"].Dependencies, []string{"generateItems-1"})
td.Cmp(t, res.Steps["generateItems-3"].Dependencies, []string{"generateItems-2"})
td.Cmp(t, res.Steps["generateItems-4"].Dependencies, []string{"generateItems-3"})
}

func TestForeachWithPreRun(t *testing.T) {
input := map[string]interface{}{}
res, err := createResolution("foreachAndPreRun.yaml", input, nil)
Expand Down
71 changes: 70 additions & 1 deletion engine/step/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ const (
RetrySeconds = "seconds"
RetryMinutes = "minutes"
RetryHours = "hours"

ForEachStrategyParallel = "parallel"
ForEachStrategySequence = "sequence"
)

// possible states of a step
Expand Down Expand Up @@ -106,7 +109,8 @@ type Step struct {
Conditions []*condition.Condition `json:"conditions,omitempty"`
skipped bool
// loop
ForEach string `json:"foreach,omitempty"` // "parent" step: expression for list of items
ForEach string `json:"foreach,omitempty"` // "parent" step: expression for list of items
ForEachStrategy string `json:"foreach_strategy"`
ChildrenSteps []string `json:"children_steps,omitempty"` // list of children names
ChildrenStepMap map[string]bool `json:"children_steps_map,omitempty"`
Item interface{} `json:"item,omitempty"` // "child" step: item value, issued from foreach
Expand Down Expand Up @@ -558,6 +562,71 @@ func (st *Step) ValidAndNormalize(name string, baseConfigs map[string]json.RawMe
}
}

// check that we don't set restricted field from the template
if st.State != "" {
return errors.NewNotValid(nil, "step state must not be set")
}

if st.ChildrenSteps != nil {
return errors.NewNotValid(nil, "step children_steps must not be set")
}

if st.ChildrenStepMap != nil {
return errors.NewNotValid(nil, "step children_steps_map must not be set")
}

if st.Output != nil {
return errors.NewNotValid(nil, "step output must not be set")
}

if st.Metadata != nil {
return errors.NewNotValid(nil, "step metadatas must not be set")
}

if st.Tags != nil {
return errors.NewNotValid(nil, "step tags must not be set")
}

if st.Children != nil {
return errors.NewNotValid(nil, "step children must not be set")
}

if st.Error != "" {
return errors.NewNotValid(nil, "step error must not be set")
}

if st.Metadata != nil {
return errors.NewNotValid(nil, "step metadatas must not be set")
}

if st.TryCount != 0 {
return errors.NewNotValid(nil, "step try_count must not be set")
}

t := time.Time{}
if st.LastRun != t {
return errors.NewNotValid(nil, "step last_time must not be set")
}

if st.Item != nil {
return errors.NewNotValid(nil, "step item must not be set")
}

if st.ForEachStrategy != "" && st.ForEach == "" {
return errors.NewNotValid(nil, "step foreach_strategy can't be set without foreach")
}

if st.ForEach != "" {
switch st.ForEachStrategy {
case ForEachStrategyParallel, ForEachStrategySequence:
case "":
// expliciting default value
st.ForEachStrategy = ForEachStrategyParallel
default:
return errors.NewNotValid(nil, fmt.Sprintf("step foreach_strategy %q is not a valid value", st.ForEachStrategy))
}
}

// valid execution delay
if st.ExecutionDelay < 0 || st.ExecutionDelay > maxExecutionDelay {
return errors.NewNotValid(nil,
Expand Down
48 changes: 25 additions & 23 deletions engine/templates_tests/foreach.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,64 +7,66 @@ inputs:
steps:
emptyLoop:
description: a foreach step with empty input
foreach: '[]'
foreach: "[]"
action:
type: echo
configuration:
output: {foo: bar}
output: { foo: bar }
generateItems:
description: generate list for next step
foreach: '{{.input.list | toJson}}'
foreach: "{{.input.list | toJson}}"
conditions:
- type: skip
if:
- value: '{{.iterator}}'
operator: EQ
expected: a
- value: "{{.iterator}}"
operator: EQ
expected: a
then:
this: PRUNE
this: PRUNE
- type: check
if:
- value: '{{.iterator}}'
operator: EQ
expected: b
- value: "{{.iterator}}"
operator: EQ
expected: b
then:
bStep: B
bStep: B
action:
type: echo
configuration:
output:
foo: 'foo-{{.iterator}}'
bar: 'bar-{{.iterator}}'
foo: "foo-{{.iterator}}"
bar: "bar-{{.iterator}}"
concatItems:
description: transform a list of items
description: transform a list of items
dependencies: [generateItems]
foreach: '{{.step.generateItems.children | toJson}}'
foreach: "{{.step.generateItems.children | toJson}}"
conditions:
- type: check
if:
- value: '{{ index .step "this" "output" "concat"}}'
operator: EQ
expected: foo-c-bar-c
- value: '{{ index .step "this" "output" "concat"}}'
operator: EQ
expected: foo-c-bar-c
then:
this: PRUNE
this: PRUNE
action:
type: echo
configuration:
output: {concat: '{{.iterator.output.foo}}-{{.iterator.output.bar}}'}
output:
{
concat: "{{.iterator.output.foo}}-{{.iterator.output.bar}}",
}
bStep:
description: impacted by concatItems b step
dependencies: [generateItems]
custom_states: [B]
action:
type: echo
configuration:
output: {foo: 42}
output: { foo: 42 }
finalStep:
description: pruned by concatItems("c")
dependencies: [concatItems]
action:
type: echo
configuration:
output: {foo: bar}

output: { foo: bar }

0 comments on commit d8a0b5a

Please sign in to comment.