Skip to content

Commit

Permalink
fix(core): missing subflow can lead to infinite loop (#2178)
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Sep 27, 2023
1 parent fef619b commit 7267811
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 5 deletions.
12 changes: 12 additions & 0 deletions core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,18 @@ private Executor handleFlowTask(final Executor executor) {
);

try {
// mark taskrun as running to avoid multiple try for failed
TaskRun taskRunByTaskRunId = executor.getExecution()
.findTaskRunByTaskRunId(workerTask.getTaskRun().getId());

executor.withExecution(
executor
.getExecution()
.withTaskRun(taskRunByTaskRunId.withState(State.Type.RUNNING)),
"handleFlowTaskRunning"
);

// create the execution
Execution execution = flowTask.createExecution(runContext, flowExecutorInterface());

WorkerTaskExecution workerTaskExecution = WorkerTaskExecution.builder()
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/java/io/kestra/core/tasks/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,14 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl

Map<String, String> flowVars = (Map<String, String>) runContext.getVariables().get("flow");

String namespace = runContext.render(this.namespace);
String flowId = runContext.render(this.flowId);
Optional<Integer> revision = this.revision != null ? Optional.of(this.revision) : Optional.empty();

io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromFlowTask(
runContext.render(this.namespace),
runContext.render(this.flowId),
this.revision != null ? Optional.of(this.revision) : Optional.empty(),
namespace,
flowId,
revision,
flowVars.get("namespace"),
flowVars.get("id")
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
id: each-parallel-subflow-notfound
namespace: io.kestra.tests

tasks:
- id: 1_each
type: io.kestra.core.tasks.flows.EachParallel
value:
- value-1
- value-2
tasks:
- id: subflow-not-exist
type: io.kestra.core.tasks.flows.Flow
flowId: "{{ taskrun.value }}"
namespace: dev
wait: true
19 changes: 17 additions & 2 deletions jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
Expand All @@ -28,8 +31,8 @@
import java.util.concurrent.TimeoutException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.is;

@MicronautTest(transactional = false)
@TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time
Expand Down Expand Up @@ -126,6 +129,18 @@ void parallelNested() throws TimeoutException, QueueException {
assertThat(execution.getTaskRunList(), hasSize(11));
}

@Test
void eachParallelWithSubflowMissing() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "each-parallel-subflow-notfound");

assertThat(execution, notNullValue());
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
// on JDBC, when using an each parallel, the flow is failed even if not all subtasks of the each parallel are ended as soon as
// there is one failed task FIXME https://github.com/kestra-io/kestra/issues/2179
// so instead of asserting that all tasks FAILED we assert that at least two failed (the each parallel and one of its subtasks)
assertThat(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getState().isFailed()).count(), is(2L)); // Should be 3
}

@Test
void eachSequentialNested() throws TimeoutException {
Execution execution = runnerUtils.runOne("io.kestra.tests", "each-sequential-nested", null, null, Duration.ofSeconds(60));
Expand Down

0 comments on commit 7267811

Please sign in to comment.