Skip to content

Commit

Permalink
chore(version): update to version 'v0.9.2'.
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jun 5, 2023
2 parents f11e55b + 1cace91 commit cd25424
Show file tree
Hide file tree
Showing 33 changed files with 178 additions and 121 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/io/kestra/core/runners/FlowListeners.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private boolean remove(Flow flow) {
}
}

private synchronized void upsert(Flow flow) {
private void upsert(Flow flow) {
synchronized (this) {
this.remove(flow);

Expand All @@ -126,7 +126,7 @@ private void notifyConsumersEach(Flow flow, Flow previous) {
}

@Override
public synchronized void listen(Consumer<List<Flow>> consumer) {
public void listen(Consumer<List<Flow>> consumer) {
synchronized (this) {
consumers.add(consumer);
consumer.accept(new ArrayList<>(this.flows()));
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/kestra/core/runners/RunnerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,9 @@ public Execution awaitExecution(Predicate<Execution> predicate, Runnable executi
executionEmitter.run();

if (duration == null) {
Await.until(() -> receive.get() != null);
Await.until(() -> receive.get() != null, Duration.ofMillis(10));
} else {
Await.until(() -> receive.get() != null, null, duration);
Await.until(() -> receive.get() != null, Duration.ofMillis(10), duration);
}

cancel.run();
Expand Down
89 changes: 50 additions & 39 deletions core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,29 @@ public abstract class AbstractScheduler implements Scheduler {
private final MetricRegistry metricRegistry;
private final ConditionService conditionService;
private final TaskDefaultService taskDefaultService;

protected SchedulerExecutionStateInterface executionState;
protected SchedulerTriggerStateInterface triggerState;
protected Boolean isReady = false;

private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
private final ListeningExecutorService cachedExecutor;
private final Map<String, ZonedDateTime> lastEvaluate = new ConcurrentHashMap<>();

// The evaluateRunningLock must be used when accessing evaluateRunning or evaluateRunningCount
private final Object evaluateRunningLock = new Object();
private final Map<String, ZonedDateTime> evaluateRunning = new ConcurrentHashMap<>();
private final Map<String, AtomicInteger> evaluateRunningCount = new ConcurrentHashMap<>();

// The triggerStateSavedLock must be used when accessing triggerStateSaved
private final Object triggerStateSavedLock = new Object();
private final Map<String, Trigger> triggerStateSaved = new ConcurrentHashMap<>();
protected SchedulerTriggerStateInterface triggerState;

@Getter
private List<FlowWithTrigger> schedulable = new ArrayList<>();

// schedulable and schedulableNextDate must be volatile and their access synchronized as they are updated and read by different threads.
@Getter
private volatile List<FlowWithTrigger> schedulable = new ArrayList<>();
@Getter
private Map<String, FlowWithPollingTriggerNextDate> schedulableNextDate = new HashMap<>();
private volatile Map<String, FlowWithPollingTriggerNextDate> schedulableNextDate = new HashMap<>();

@SuppressWarnings("unchecked")
@Inject
Expand Down Expand Up @@ -127,13 +133,13 @@ public void run() {

// remove trigger on flow update
this.flowListeners.listen((flow, previous) -> {
synchronized (this) {
synchronized (triggerStateSavedLock) {
if (flow.isDeleted()) {
ListUtils.emptyOnNull(flow.getTriggers())
.forEach(abstractTrigger -> {
Trigger trigger = Trigger.of(flow, abstractTrigger);
triggerStateSaved.remove(trigger.uid());

triggerStateSaved.remove(trigger.uid());
triggerQueue.delete(trigger);
});
} else if (previous != null) {
Expand All @@ -150,8 +156,9 @@ public void run() {
});
}

private void computeSchedulable(List<Flow> flows) {
schedulableNextDate = new HashMap<>();
// must be synchronized as it update schedulableNextDate and schedulable, and will be executed on the flow listener thread
private synchronized void computeSchedulable(List<Flow> flows) {
this.schedulableNextDate = new HashMap<>();

this.schedulable = flows
.stream()
Expand Down Expand Up @@ -186,17 +193,17 @@ private void handle() {

ZonedDateTime now = now();

synchronized (this) {
if (log.isDebugEnabled()) {
log.debug(
"Scheduler next iteration for {} with {} schedulables of {} flows",
now,
schedulable.size(),
this.flowListeners.flows().size()
);
}
if (log.isDebugEnabled()) {
log.debug(
"Scheduler next iteration for {} with {} schedulables of {} flows",
now,
schedulable.size(),
this.flowListeners.flows().size()
);
}

// get all that is ready from evaluation
synchronized (this) {
// get all triggers that are ready from evaluation
List<FlowWithPollingTriggerNextDate> readyForEvaluate = schedulable
.stream()
.filter(f -> conditionService.isValid(f.getFlow(), f.getTrigger(), f.getConditionContext()))
Expand All @@ -219,17 +226,15 @@ private void handle() {
.filter(f -> this.isEvaluationInterval(f, now))
.filter(f -> this.isExecutionNotRunning(f, now))
.map(f -> {
synchronized (this) {
Trigger lastTrigger = this.getLastTrigger(f, now);
Trigger lastTrigger = this.getLastTrigger(f, now);

return FlowWithPollingTriggerNextDate.of(
f,
f.getPollingTrigger().nextEvaluationDate(f.getConditionContext(), Optional.of(lastTrigger))
);
}
return FlowWithPollingTriggerNextDate.of(
f,
f.getPollingTrigger().nextEvaluationDate(f.getConditionContext(), Optional.of(lastTrigger))
);
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
.toList();

if (log.isDebugEnabled()) {
log.debug(
Expand Down Expand Up @@ -320,7 +325,8 @@ private void handleEvaluatePollingTriggerResult(SchedulerExecutionWithTrigger re
}

private void addToRunning(TriggerContext triggerContext, ZonedDateTime now) {
synchronized (this) {
synchronized (evaluateRunningLock) {
// TODO see if we can move this to the time we first met a new trigger to avoid calling it at each trigger execution
this.evaluateRunningCount.computeIfAbsent(triggerContext.uid(), s -> metricRegistry
.gauge(MetricRegistry.SCHEDULER_EVALUATE_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(triggerContext)));

Expand All @@ -331,7 +337,7 @@ private void addToRunning(TriggerContext triggerContext, ZonedDateTime now) {


private void removeFromRunning(TriggerContext triggerContext) {
synchronized (this) {
synchronized (evaluateRunningLock) {
if (this.evaluateRunning.remove(triggerContext.uid()) == null) {
throw new IllegalStateException("Can't remove trigger '" + triggerContext.uid() + "' from running");
}
Expand Down Expand Up @@ -437,18 +443,21 @@ private Trigger getLastTrigger(FlowWithPollingTrigger f, ZonedDateTime now) {
.updatedDate(Instant.now())
.build();

// TODO ask ludo for the comment as it didn't seems to be accurate
// we don't find, so never started execution, create a trigger context with previous date in the past.
// this allows some edge case when the evaluation loop of schedulers will change second
// between start and end
if (triggerStateSaved.containsKey(build.uid())) {
Trigger cachedTrigger = triggerStateSaved.get(build.uid());
synchronized (triggerStateSavedLock) {
if (triggerStateSaved.containsKey(build.uid())) {
Trigger cachedTrigger = triggerStateSaved.get(build.uid());

triggerState.save(build);
triggerStateSaved.remove(build.uid());
triggerState.save(build);
triggerStateSaved.remove(build.uid());

return cachedTrigger;
} else {
triggerStateSaved.put(build.uid(), build);
return cachedTrigger;
} else {
triggerStateSaved.put(build.uid(), build);
}
}

return build;
Expand Down Expand Up @@ -482,14 +491,16 @@ private boolean isEvaluationInterval(FlowWithPollingTrigger flowWithPollingTrigg
return result;
}

protected synchronized void saveLastTriggerAndEmitExecution(SchedulerExecutionWithTrigger executionWithTrigger) {
protected void saveLastTriggerAndEmitExecution(SchedulerExecutionWithTrigger executionWithTrigger) {
Trigger trigger = Trigger.of(
executionWithTrigger.getTriggerContext(),
executionWithTrigger.getExecution()
);

this.triggerState.save(trigger);
this.executionQueue.emit(executionWithTrigger.getExecution());
synchronized (triggerStateSavedLock) {
this.triggerState.save(trigger);
this.executionQueue.emit(executionWithTrigger.getExecution());
}
}

private static ZonedDateTime now() {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/kestra/core/utils/TestsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public static Map.Entry<ConditionContext, TriggerContext> mockTrigger(RunContext

return new AbstractMap.SimpleEntry<>(
ConditionContext.builder()
.runContext(runContextFactory.of(flow, trigger))
.runContext(runContextFactory.of(flow, trigger).forScheduler(flow, trigger))
.flow(flow)
.build(),
triggerContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,10 @@ protected void taskRunsDailyStatistics() {
@SuppressWarnings("OptionalGetWithoutIsPresent")
@Test
protected void executionsCount() throws InterruptedException {
for (int i = 0; i < 28; i++) {
for (int i = 0; i < 14; i++) {
executionRepository.save(builder(
State.Type.SUCCESS,
i < 4 ? "first" : (i < 10 ? "second" : "third")
i < 2 ? "first" : (i < 5 ? "second" : "third")
).build());
}

Expand All @@ -342,9 +342,9 @@ protected void executionsCount() throws InterruptedException {
);

assertThat(result.size(), is(4));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("first")).findFirst().get().getCount(), is(4L));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("second")).findFirst().get().getCount(), is(6L));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("third")).findFirst().get().getCount(), is(18L));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("first")).findFirst().get().getCount(), is(2L));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("second")).findFirst().get().getCount(), is(3L));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("third")).findFirst().get().getCount(), is(9L));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("missing")).findFirst().get().getCount(), is(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import jakarta.inject.Singleton;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.io.IOException;
import java.net.URISyntaxException;
Expand All @@ -36,8 +37,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.validation.ConstraintViolationException;

Expand All @@ -46,6 +45,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;

@MicronautTest(transactional = false)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractFlowRepositoryTest {
@Inject
protected FlowRepositoryInterface flowRepository;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ void all() {
"flow",
null,
counter.getName(),
ZonedDateTime.now().minusDays(190),
ZonedDateTime.now().minusWeeks(26),
ZonedDateTime.now(),
"sum"
);

assertThat(aggregationResults.getAggregations().size(), is(28));
assertThat(aggregationResults.getAggregations().size(), is(27));
assertThat(aggregationResults.getGroupBy(), is("week"));

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.kestra.core.runners;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.TaskDefaultService;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import lombok.SneakyThrows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@

import java.time.Duration;
import java.util.List;
import java.util.Optional;

import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static io.kestra.core.utils.Rethrow.throwRunnable;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Singleton
public class RestartCaseTest {
Expand Down Expand Up @@ -138,7 +141,9 @@ public void restartFailedThenFailureWithLocalErrors() throws Exception {
assertThat(finishedRestartedExecution.getParentId(), nullValue());
assertThat(finishedRestartedExecution.getTaskRunList().size(), is(5));

assertThat(finishedRestartedExecution.getTaskRunList().get(3).getAttempts().size(), is(2));
Optional<TaskRun> taskRun = finishedRestartedExecution.findTaskRunsByTaskId("failStep").stream().findFirst();
assertTrue(taskRun.isPresent());
assertThat(taskRun.get().getAttempts().size(), is(2));

assertThat(finishedRestartedExecution.getState().getCurrent(), is(State.Type.FAILED));
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/java/io/kestra/core/runners/RetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
import java.util.concurrent.TimeoutException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;

public class RetryTest extends AbstractMemoryRunnerTest {
@Inject
Expand All @@ -40,8 +39,9 @@ void retryFailed() throws TimeoutException {
assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(5));

// be sure attempts are available on queue
assertThat(executions.size(), is(19));
// be sure attempts are available on the queue
// we cannot know the exact number of executions, but we should have at least 15 of them
assertThat(executions.size(), greaterThan(15));
assertThat(executions.get(8).getTaskRunList().get(0).getAttempts().size(), is(3));
}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=0.9.1
version=0.9.2
micronautVersion=3.9.0
lombokVersion=1.18.26
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
package io.kestra.repository.h2;

import io.kestra.jdbc.repository.AbstractJdbcFlowRepositoryTest;
import org.junit.jupiter.api.BeforeEach;

import java.io.IOException;
import java.net.URISyntaxException;

public class H2lFlowRepositoryTest extends AbstractJdbcFlowRepositoryTest {

// On H2 we must reset the database and init the flow repository on the same method.
// That's why the setup is overridden to do noting and the init will do the setup.
@Override
protected void setup() {
}

@Override
@BeforeEach // on H2 we must reset the
protected void init() throws IOException, URISyntaxException {
super.setup();
super.init();
}
}
7 changes: 6 additions & 1 deletion jdbc-h2/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,9 @@ kestra:
cls: io.kestra.core.models.Setting
flowtopologies:
table: "flow_topologies"
cls: io.kestra.core.models.topologies.FlowTopology
cls: io.kestra.core.models.topologies.FlowTopology

queues:
min-poll-interval: 10ms
max-poll-interval: 100ms
poll-switch-interval: 5s
Loading

0 comments on commit cd25424

Please sign in to comment.