From 7267811c0d42054e55d8fb698ccfb627c0a78317 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Wed, 27 Sep 2023 09:44:30 +0200 Subject: [PATCH] fix(core): missing subflow can lead to infinite loop (#2178) --- .../kestra/core/runners/ExecutorService.java | 12 ++++++++++++ .../java/io/kestra/core/tasks/flows/Flow.java | 10 +++++++--- .../valids/each-parallel-subflow-notfound.yml | 15 +++++++++++++++ .../io/kestra/jdbc/runner/JdbcRunnerTest.java | 19 +++++++++++++++++-- 4 files changed, 51 insertions(+), 5 deletions(-) create mode 100644 core/src/test/resources/flows/valids/each-parallel-subflow-notfound.yml diff --git a/core/src/main/java/io/kestra/core/runners/ExecutorService.java b/core/src/main/java/io/kestra/core/runners/ExecutorService.java index 63717e1143..eec80a6aea 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutorService.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutorService.java @@ -578,6 +578,18 @@ private Executor handleFlowTask(final Executor executor) { ); try { + // mark taskrun as running to avoid multiple try for failed + TaskRun taskRunByTaskRunId = executor.getExecution() + .findTaskRunByTaskRunId(workerTask.getTaskRun().getId()); + + executor.withExecution( + executor + .getExecution() + .withTaskRun(taskRunByTaskRunId.withState(State.Type.RUNNING)), + "handleFlowTaskRunning" + ); + + // create the execution Execution execution = flowTask.createExecution(runContext, flowExecutorInterface()); WorkerTaskExecution workerTaskExecution = WorkerTaskExecution.builder() diff --git a/core/src/main/java/io/kestra/core/tasks/flows/Flow.java b/core/src/main/java/io/kestra/core/tasks/flows/Flow.java index 182cf36d9e..44f8e1d30f 100644 --- a/core/src/main/java/io/kestra/core/tasks/flows/Flow.java +++ b/core/src/main/java/io/kestra/core/tasks/flows/Flow.java @@ -131,10 +131,14 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl Map flowVars = (Map) runContext.getVariables().get("flow"); + String namespace = runContext.render(this.namespace); + String flowId = runContext.render(this.flowId); + Optional revision = this.revision != null ? Optional.of(this.revision) : Optional.empty(); + io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromFlowTask( - runContext.render(this.namespace), - runContext.render(this.flowId), - this.revision != null ? Optional.of(this.revision) : Optional.empty(), + namespace, + flowId, + revision, flowVars.get("namespace"), flowVars.get("id") ) diff --git a/core/src/test/resources/flows/valids/each-parallel-subflow-notfound.yml b/core/src/test/resources/flows/valids/each-parallel-subflow-notfound.yml new file mode 100644 index 0000000000..23bc9b8541 --- /dev/null +++ b/core/src/test/resources/flows/valids/each-parallel-subflow-notfound.yml @@ -0,0 +1,15 @@ +id: each-parallel-subflow-notfound +namespace: io.kestra.tests + +tasks: + - id: 1_each + type: io.kestra.core.tasks.flows.EachParallel + value: + - value-1 + - value-2 + tasks: + - id: subflow-not-exist + type: io.kestra.core.tasks.flows.Flow + flowId: "{{ taskrun.value }}" + namespace: dev + wait: true diff --git a/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java b/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java index 0b404c6992..2aa4cf03a0 100644 --- a/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java +++ b/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcRunnerTest.java @@ -2,6 +2,9 @@ import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.LogEntry; +import io.kestra.core.models.executions.TaskRun; +import io.kestra.core.models.flows.Flow; +import io.kestra.core.models.flows.State; import io.kestra.core.queues.QueueException; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; @@ -28,8 +31,8 @@ import java.util.concurrent.TimeoutException; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.matchesPattern; +import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.is; @MicronautTest(transactional = false) @TestInstance(TestInstance.Lifecycle.PER_CLASS) // must be per-class to allow calling once init() which took a lot of time @@ -126,6 +129,18 @@ void parallelNested() throws TimeoutException, QueueException { assertThat(execution.getTaskRunList(), hasSize(11)); } + @Test + void eachParallelWithSubflowMissing() throws TimeoutException { + Execution execution = runnerUtils.runOne("io.kestra.tests", "each-parallel-subflow-notfound"); + + assertThat(execution, notNullValue()); + assertThat(execution.getState().getCurrent(), is(State.Type.FAILED)); + // on JDBC, when using an each parallel, the flow is failed even if not all subtasks of the each parallel are ended as soon as + // there is one failed task FIXME https://github.com/kestra-io/kestra/issues/2179 + // so instead of asserting that all tasks FAILED we assert that at least two failed (the each parallel and one of its subtasks) + assertThat(execution.getTaskRunList().stream().filter(taskRun -> taskRun.getState().isFailed()).count(), is(2L)); // Should be 3 + } + @Test void eachSequentialNested() throws TimeoutException { Execution execution = runnerUtils.runOne("io.kestra.tests", "each-sequential-nested", null, null, Duration.ofSeconds(60));