Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispose TimedRunnable upon TimedWorker shutdown #3856

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package reactor.core.observability.micrometer;

import java.util.Collection;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.LongTaskTimer;
Expand All @@ -27,8 +29,10 @@
import io.micrometer.core.instrument.Timer;

import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.observability.micrometer.TimedSchedulerMeterDocumentation.SubmittedTags;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.Nullable;

import static reactor.core.observability.micrometer.TimedSchedulerMeterDocumentation.*;

Expand Down Expand Up @@ -136,21 +140,32 @@ static final class TimedWorker implements Worker {
final TimedScheduler parent;
final Worker delegate;

/**
* As this Worker creates {@link TimedRunnable} instances which are {@link Disposable}
* it needs to keep track of them to be able to dispose them when this instance
* is {@link #dispose() disposed}.
*/
final Composite disposables;

TimedWorker(TimedScheduler parent, Worker delegate) {
this.parent = parent;
this.delegate = delegate;
this.disposables = Disposables.composite();
}

TimedRunnable wrap(Runnable task) {
return new WorkerBackedTimedRunnable(parent.registry, parent, delegate, task);
return new WorkerBackedTimedRunnable(parent.registry, parent, delegate,
task, disposables);
}

TimedRunnable wrapPeriodic(Runnable task) {
return new WorkerBackedTimedRunnable(parent.registry, parent, delegate, task, true);
return new WorkerBackedTimedRunnable(parent.registry, parent, delegate,
task, disposables, true);
}

@Override
public void dispose() {
disposables.dispose();
delegate.dispose();
}

Expand All @@ -162,51 +177,67 @@ public boolean isDisposed() {
@Override
public Disposable schedule(Runnable task) {
TimedRunnable timedTask = wrap(task);
disposables.add(timedTask);

return timedTask.schedule();
}

@Override
public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
TimedRunnable timedTask = wrap(task);
disposables.add(timedTask);

return timedTask.schedule(delay, unit);
}

@Override
public Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) {
TimedRunnable timedTask = wrapPeriodic(task);
disposables.add(timedTask);

return timedTask.schedulePeriodically(initialDelay, period, unit);
}
}

private static abstract class TimedRunnable implements Runnable, Disposable {
final MeterRegistry registry;
final TimedScheduler parent;
final Runnable task;
/** marker that the Worker was disposed and the parent got notified */
static final Composite DISPOSED = new EmptyCompositeDisposable();
/** marker that the Worker has completed, for the PARENT field */
static final Composite DONE = new EmptyCompositeDisposable();

final MeterRegistry registry;
final TimedScheduler timedScheduler;
final Runnable task;

final LongTaskTimer.Sample pendingSample;

boolean isRerun;

Disposable disposable;

TimedRunnable(MeterRegistry registry, TimedScheduler parent, Runnable task) {
this(registry, parent, task, false);
volatile Composite parent;
static final AtomicReferenceFieldUpdater<TimedRunnable, Composite> PARENT =
AtomicReferenceFieldUpdater.newUpdater(TimedRunnable.class, Composite.class, "parent");

TimedRunnable(MeterRegistry registry, TimedScheduler timedScheduler, Runnable task,
@Nullable Composite parent) {
this(registry, timedScheduler, task, parent, false);
}

TimedRunnable(MeterRegistry registry, TimedScheduler parent, Runnable task, boolean periodic) {
TimedRunnable(MeterRegistry registry, TimedScheduler timedScheduler, Runnable task,
@Nullable Composite parent, boolean periodic) {
this.registry = registry;
this.parent = parent;
this.timedScheduler = timedScheduler;
this.task = task;

if (periodic) {
this.pendingSample = null;
}
else {
this.pendingSample = parent.pendingTasks.start();
this.pendingSample = timedScheduler.pendingTasks.start();
}
this.isRerun = false; //will be ignored if not periodic
PARENT.lazySet(this, parent);
}

@Override
Expand All @@ -220,16 +251,23 @@ public void run() {
this.isRerun = true;
}
else {
parent.submittedPeriodicIteration.increment();
timedScheduler.submittedPeriodicIteration.increment();
}
}

Runnable completionTrackingTask = parent.completedTasks.wrap(this.task);
this.parent.activeTasks.record(completionTrackingTask);
try {
Runnable completionTrackingTask = timedScheduler.completedTasks.wrap(this.task);
this.timedScheduler.activeTasks.record(completionTrackingTask);
} finally {
Composite o = parent;
if (o != DISPOSED && PARENT.compareAndSet(this, o, DONE) && o != null) {
o.remove(this);
}
}
}

public Disposable schedule() {
parent.submittedDirect.increment();
timedScheduler.submittedDirect.increment();

try {
disposable = this.internalSchedule();
Expand All @@ -241,7 +279,7 @@ public Disposable schedule() {
}

public Disposable schedule(long delay, TimeUnit unit) {
parent.submittedDelayed.increment();
timedScheduler.submittedDelayed.increment();

try {
disposable = this.internalSchedule(delay, unit);
Expand All @@ -253,7 +291,7 @@ public Disposable schedule(long delay, TimeUnit unit) {
}

public Disposable schedulePeriodically(long initialDelay, long period, TimeUnit unit) {
parent.submittedPeriodicInitial.increment();
timedScheduler.submittedPeriodicInitial.increment();
return this.internalSchedulePeriodically(initialDelay, period, unit);
}

Expand All @@ -266,6 +304,23 @@ public void dispose() {
if (pendingSample != null) {
pendingSample.stop();
}

for (;;) {
Composite o = parent;
if (o == DONE || o == DISPOSED || o == null) {
return;
}
if (PARENT.compareAndSet(this, o, DISPOSED)) {
o.remove(this);
return;
}
}
}

@Override
public boolean isDisposed() {
Composite o = PARENT.get(this);
return o == DISPOSED || o == DONE;
}

abstract Disposable internalSchedule();
Expand All @@ -279,13 +334,15 @@ static final class WorkerBackedTimedRunnable extends TimedRunnable {

final Worker worker;

WorkerBackedTimedRunnable(MeterRegistry registry, TimedScheduler parent, Worker worker, Runnable task) {
super(registry, parent, task);
WorkerBackedTimedRunnable(MeterRegistry registry, TimedScheduler timedScheduler,
Worker worker, Runnable task, Composite parent) {
super(registry, timedScheduler, task, parent);
this.worker = worker;
}

WorkerBackedTimedRunnable(MeterRegistry registry, TimedScheduler parent, Worker worker, Runnable task, boolean periodic) {
super(registry, parent, task, periodic);
WorkerBackedTimedRunnable(MeterRegistry registry, TimedScheduler timedScheduler,
Worker worker, Runnable task, Composite parent, boolean periodic) {
super(registry, timedScheduler, task, parent, periodic);
this.worker = worker;
}

Expand All @@ -309,13 +366,15 @@ static final class SchedulerBackedTimedRunnable extends TimedRunnable {

final Scheduler scheduler;

SchedulerBackedTimedRunnable(MeterRegistry registry, TimedScheduler parent, Scheduler scheduler, Runnable task) {
super(registry, parent, task);
SchedulerBackedTimedRunnable(MeterRegistry registry, TimedScheduler timedScheduler,
Scheduler scheduler, Runnable task) {
super(registry, timedScheduler, task, null);
this.scheduler = scheduler;
}

SchedulerBackedTimedRunnable(MeterRegistry registry, TimedScheduler parent, Scheduler scheduler, Runnable task, boolean periodic) {
super(registry, parent, task, periodic);
SchedulerBackedTimedRunnable(MeterRegistry registry, TimedScheduler timedScheduler,
Scheduler scheduler, Runnable task, boolean periodic) {
super(registry, timedScheduler, task, null, periodic);
this.scheduler = scheduler;
}

Expand All @@ -334,4 +393,40 @@ Disposable internalSchedulePeriodically(long initialDelay, long period, TimeUnit
return scheduler.schedulePeriodically(this, initialDelay, period, unit);
}
}

/**
* Copy of reactor.core.scheduler.EmptyCompositeDisposable for internal use.
*/
static final class EmptyCompositeDisposable implements Disposable.Composite {

@Override
public boolean add(Disposable d) {
return false;
}

@Override
public boolean addAll(Collection<? extends Disposable> ds) {
return false;
}

@Override
public boolean remove(Disposable d) {
return false;
}

@Override
public int size() {
return 0;
}

@Override
public void dispose() {
}

@Override
public boolean isDisposed() {
return false;
}

}
}
Loading
Loading