Skip to content

Commit

Permalink
perf: partial concurrency on tickets
Browse files Browse the repository at this point in the history
  • Loading branch information
ishland committed Aug 3, 2024
1 parent 37739c0 commit 4a3b7fa
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 105 deletions.
79 changes: 51 additions & 28 deletions src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.ishland.flowsched.scheduler;

import com.ishland.flowsched.structs.SimpleObjectPool;
import com.ishland.flowsched.util.Assertions;
import io.reactivex.rxjava3.core.Completable;
import it.unimi.dsi.fastutil.objects.Object2ReferenceLinkedOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2ReferenceMap;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectBidirectionalIterator;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Arrays;
import java.util.Objects;
Expand All @@ -19,6 +19,8 @@

public class ItemHolder<K, V, Ctx, UserData> {

private static final VarHandle FUTURES_HANDLE = MethodHandles.arrayElementVarHandle(CompletableFuture[].class);

public static final IllegalStateException UNLOADED_EXCEPTION = new IllegalStateException("Not loaded");
private static final CompletableFuture<Void> UNLOADED_FUTURE = CompletableFuture.failedFuture(UNLOADED_EXCEPTION);

Expand Down Expand Up @@ -58,11 +60,11 @@ protected void rehash(int newN) {
};
private boolean dependencyDirty = false;

ItemHolder(ItemStatus<K, V, Ctx> initialStatus, K key, SimpleObjectPool<TicketSet<K, V, Ctx>> ticketSetPool) {
ItemHolder(ItemStatus<K, V, Ctx> initialStatus, K key) {
this.unloadedStatus = Objects.requireNonNull(initialStatus);
this.status = this.unloadedStatus;
this.key = Objects.requireNonNull(key);
this.tickets = ticketSetPool.alloc();
this.tickets = new TicketSet<>(this.unloadedStatus);

ItemStatus<K, V, Ctx>[] allStatuses = initialStatus.getAllStatuses();
this.futures = new CompletableFuture[allStatuses.length];
Expand All @@ -74,11 +76,13 @@ protected void rehash(int newN) {
VarHandle.fullFence();
}

private synchronized void createFutures() {
private void createFutures() {
assertOpen();
final ItemStatus<K, V, Ctx> targetStatus = this.getTargetStatus();
for (int i = this.unloadedStatus.ordinal() + 1; i <= targetStatus.ordinal(); i++) {
this.futures[i] = this.futures[i] == UNLOADED_FUTURE ? new CompletableFuture<>() : this.futures[i];
synchronized (this.futures) {
final ItemStatus<K, V, Ctx> targetStatus = this.getTargetStatus();
for (int i = this.unloadedStatus.ordinal() + 1; i <= targetStatus.ordinal(); i++) {
this.futures[i] = this.futures[i] == UNLOADED_FUTURE ? new CompletableFuture<>() : this.futures[i];
}
}
}

Expand All @@ -87,7 +91,7 @@ private synchronized void createFutures() {
*
* @return the target status of this item, or null if no ticket is present
*/
public synchronized ItemStatus<K, V, Ctx> getTargetStatus() {
public ItemStatus<K, V, Ctx> getTargetStatus() {
return this.tickets.getTargetStatus();
}

Expand All @@ -107,25 +111,27 @@ public boolean isUpgrading() {

public void addTicket(ItemTicket<K, V, Ctx> ticket) {
assertOpen();
final boolean add = this.tickets.add(ticket);
if (!add) {
throw new IllegalStateException("Ticket already exists");
}
createFutures();
boolean needConsumption;
synchronized (this) {
final boolean add = this.tickets.add(ticket);
if (!add) {
throw new IllegalStateException("Ticket already exists");
}
createFutures();
needConsumption = ticket.getTargetStatus().ordinal() <= this.getStatus().ordinal();
}
if (ticket.getTargetStatus().ordinal() <= this.getStatus().ordinal()) {
if (needConsumption) {
ticket.consumeCallback();
}
}

public synchronized void removeTicket(ItemTicket<K, V, Ctx> ticket) {
public void removeTicket(ItemTicket<K, V, Ctx> ticket) {
assertOpen();
final boolean remove = this.tickets.remove(ticket);
if (!remove) {
throw new IllegalStateException("Ticket does not exist");
}
createFutures();
// createFutures();
}

public void submitOp(CompletionStage<Void> op) {
Expand All @@ -142,6 +148,10 @@ public void subscribeOp(Completable op) {
op.onErrorComplete().subscribe(this.busyRefCounter::decrementRefCount);
}

BusyRefCounter busyRefCounter() {
return this.busyRefCounter;
}

public void submitUpgradeAction(CancellationSignaller signaller) {
assertOpen();
final boolean success = this.runningUpgradeAction.compareAndSet(null, signaller);
Expand Down Expand Up @@ -169,31 +179,38 @@ public void submitOpListener(Runnable runnable) {
this.busyRefCounter.addListener(runnable);
}

public void setStatus(ItemStatus<K, V, Ctx> status) {
public boolean setStatus(ItemStatus<K, V, Ctx> status) {
assertOpen();
ItemTicket<K, V, Ctx>[] ticketsToFire = null;
CompletableFuture<Void> futureToFire = null;
synchronized (this) {
final ItemStatus<K, V, Ctx> prevStatus = this.getStatus();
Assertions.assertTrue(status != prevStatus, "duplicate setStatus call");
this.status = status;
// this.statusHistory.add(Pair.of(status, System.currentTimeMillis()));
final int compare = Integer.compare(status.ordinal(), prevStatus.ordinal());
if (compare < 0) { // status downgrade
Assertions.assertTrue(prevStatus.getPrev() == status, "Invalid status downgrade");

final ItemStatus<K, V, Ctx> targetStatus = this.getTargetStatus();
for (int i = prevStatus.ordinal(); i < this.futures.length; i ++) {
if (i > targetStatus.ordinal()) {
this.futures[i].completeExceptionally(UNLOADED_EXCEPTION);
this.futures[i] = UNLOADED_FUTURE;
} else {
this.futures[i] = this.futures[i].isDone() ? new CompletableFuture<>() : this.futures[i];
if (this.getTargetStatus().ordinal() > status.ordinal()) {
return false;
}

this.status = status;
synchronized (this.futures) {
final ItemStatus<K, V, Ctx> targetStatus = this.getTargetStatus();
for (int i = prevStatus.ordinal(); i < this.futures.length; i ++) {
if (i > targetStatus.ordinal()) {
this.futures[i].completeExceptionally(UNLOADED_EXCEPTION);
this.futures[i] = UNLOADED_FUTURE;
} else {
this.futures[i] = this.futures[i].isDone() ? new CompletableFuture<>() : this.futures[i];
}
}
}
} else if (compare > 0) { // status upgrade
Assertions.assertTrue(prevStatus.getNext() == status, "Invalid status upgrade");

this.status = status;
final CompletableFuture<Void> future = this.futures[status.ordinal()];

Assertions.assertTrue(future != UNLOADED_FUTURE);
Expand All @@ -210,6 +227,7 @@ public void setStatus(ItemStatus<K, V, Ctx> status) {
if (futureToFire != null) {
futureToFire.complete(null);
}
return true;
}

public synchronized void setDependencies(ItemStatus<K, V, Ctx> status, KeyStatusPair<K, V, Ctx>[] dependencies) {
Expand Down Expand Up @@ -260,9 +278,10 @@ public void setFlag(int flag) {
this.flags.getAndUpdate(operand -> operand | flag);
}

public void release(SimpleObjectPool<TicketSet<K, V, Ctx>> ticketSetPool) {
void release() {
assertOpen();
this.tickets.assertEmpty();
setFlag(FLAG_REMOVED);
ticketSetPool.release(this.tickets);
}

public void addDependencyTicket(StatusAdvancingScheduler<K, V, Ctx, ?> scheduler, K key, ItemStatus<K, V, Ctx> status, Runnable callback) {
Expand Down Expand Up @@ -337,7 +356,11 @@ public void cleanupDependencies(StatusAdvancingScheduler<K, V, Ctx, ?> scheduler
}

private void assertOpen() {
Assertions.assertTrue((this.getFlags() & FLAG_REMOVED) == 0);
Assertions.assertTrue(isOpen());
}

public boolean isOpen() {
return (this.getFlags() & FLAG_REMOVED) == 0;
}

private static class DependencyInfo {
Expand Down
54 changes: 28 additions & 26 deletions src/main/java/com/ishland/flowsched/scheduler/ItemTicket.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package com.ishland.flowsched.scheduler;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

public class ItemTicket<K, V, Ctx> {

private static final AtomicReferenceFieldUpdater<ItemTicket, Runnable> CALLBACK_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ItemTicket.class, Runnable.class, "callback");

private final TicketType type;
private final Object source;
private final ItemStatus<K, V, Ctx> targetStatus;
private Runnable callback = null;
private int hash = 0;
private volatile Runnable callback = null;
// private int hash = 0;

public ItemTicket(TicketType type, Object source, ItemStatus<K, V, Ctx> targetStatus, Runnable callback) {
this.type = Objects.requireNonNull(type);
Expand All @@ -30,14 +33,13 @@ public TicketType getType() {
}

public void consumeCallback() {
if (this.callback == null) return;
Runnable callback;
synchronized (this) {
callback = this.callback;
this.callback = null;
}
Runnable callback = CALLBACK_UPDATER.getAndSet(this, null);
if (callback != null) {
callback.run();
try {
callback.run();
} catch (Throwable t) {
t.printStackTrace();
}
}
}

Expand All @@ -49,11 +51,11 @@ public boolean equals(Object o) {
return type == that.type && Objects.equals(source, that.source) && Objects.equals(targetStatus, that.targetStatus);
}

public boolean equalsAlternative(ItemTicket<K, V, Ctx> that) {
if (this == that) return true;
if (that == null) return false;
return type == that.type && Objects.equals(source, that.source);
}
// public boolean equalsAlternative(ItemTicket<K, V, Ctx> that) {
// if (this == that) return true;
// if (that == null) return false;
// return type == that.type && Objects.equals(source, that.source);
// }

@Override
public int hashCode() {
Expand All @@ -66,18 +68,18 @@ public int hashCode() {
return result;
}

public int hashCodeAlternative() {
int hc = hash;
if (hc == 0) {
// inlined version of Objects.hash(type, source, targetStatus)
int result = 1;

result = 31 * result + type.hashCode();
result = 31 * result + source.hashCode();
hc = hash = result;
}
return hc;
}
// public int hashCodeAlternative() {
// int hc = hash;
// if (hc == 0) {
// // inlined version of Objects.hash(type, source, targetStatus)
// int result = 1;
//
// result = 31 * result + type.hashCode();
// result = 31 * result + source.hashCode();
// hc = hash = result;
// }
// return hc;
// }

public enum TicketType {
EXTERNAL,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.ishland.flowsched.scheduler;

import com.ishland.flowsched.structs.SimpleObjectPool;
import com.ishland.flowsched.util.Assertions;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Scheduler;
Expand Down Expand Up @@ -43,12 +42,12 @@ protected void rehash(int newN) {
}
}
};
private final SimpleObjectPool<TicketSet<K, V, Ctx>> ticketSetPool = new SimpleObjectPool<>(
unused -> new TicketSet<>(getUnloadedStatus()),
TicketSet::clear,
TicketSet::clear,
4096
);
// private final SimpleObjectPool<TicketSet<K, V, Ctx>> ticketSetPool = new SimpleObjectPool<>(
// unused -> new TicketSet<>(getUnloadedStatus()),
// TicketSet::clear,
// TicketSet::clear,
// 4096
// );

protected Queue<K> createPendingUpdatesQueue() {
return new ConcurrentLinkedQueue<>();
Expand Down Expand Up @@ -137,7 +136,7 @@ public boolean tick() {
if (current.equals(getUnloadedStatus())) {
// System.out.println("Unloaded: " + key);
this.onItemRemoval(holder);
holder.release(ticketSetPool);
holder.release();
final long lock = this.itemsLock.writeLock();
try {
this.items.remove(key);
Expand All @@ -152,9 +151,12 @@ public boolean tick() {
if ((holder.getFlags() & ItemHolder.FLAG_BROKEN) != 0) {
continue; // not allowed to upgrade
}
holder.submitOp(CompletableFuture.runAsync(() -> advanceStatus0(holder, nextStatus, key), getExecutor()));
holder.submitOp(CompletableFuture.runAsync(() -> advanceStatus0(holder, nextStatus, key), getBackgroundExecutor()));
} else {
holder.setStatus(nextStatus);
final boolean success = holder.setStatus(nextStatus);
if (!success) {
continue; // target status is modified to be higher
}
holder.submitOp(CompletableFuture.runAsync(() -> downgradeStatus0(holder, current, nextStatus, key), getExecutor()));
}
}
Expand Down Expand Up @@ -216,7 +218,6 @@ private void advanceStatus0(ItemHolder<K, V, Ctx, UserData> holder, ItemStatus<K
emitter.onComplete();
}
}))
.subscribeOn(getSchedulerBackedByBackgroundExecutor())
.observeOn(getSchedulerBackedByBackgroundExecutor())
.andThen(Completable.defer(() -> {
Assertions.assertTrue(holder.isBusy());
Expand Down Expand Up @@ -396,13 +397,18 @@ private ItemHolder<K, V, Ctx, UserData> addTicket0(K key, ItemTicket<K, V, Ctx>
ItemHolder<K, V, Ctx, UserData> holder = this.getOrCreateHolder(key);

synchronized (holder) {
if ((holder.getFlags() & ItemHolder.FLAG_REMOVED) != 0) {
if (!holder.isOpen()) {
// holder got removed before we had chance to add a ticket to it, retry
System.out.println(String.format("Retrying addTicket0(%s, %s)", key, ticket));
continue;
}
holder.busyRefCounter().incrementRefCount();
}
try {
holder.addTicket(ticket);
markDirty(key);
} finally {
holder.busyRefCounter().decrementRefCount();
}
return holder;
}
Expand All @@ -413,7 +419,7 @@ private ItemHolder<K, V, Ctx, UserData> addTicket0(K key, ItemTicket<K, V, Ctx>
}

private ItemHolder<K, V, Ctx, UserData> createHolder(K k) {
final ItemHolder<K, V, Ctx, UserData> holder1 = new ItemHolder<>(this.getUnloadedStatus(), k, ticketSetPool);
final ItemHolder<K, V, Ctx, UserData> holder1 = new ItemHolder<>(this.getUnloadedStatus(), k);
this.onItemCreation(holder1);
VarHandle.fullFence();
return holder1;
Expand Down
Loading

0 comments on commit 4a3b7fa

Please sign in to comment.