diff --git a/src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java b/src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java index 9a90875..0788858 100644 --- a/src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java +++ b/src/main/java/com/ishland/flowsched/scheduler/ItemHolder.java @@ -1,6 +1,5 @@ package com.ishland.flowsched.scheduler; -import com.ishland.flowsched.structs.SimpleObjectPool; import com.ishland.flowsched.util.Assertions; import io.reactivex.rxjava3.core.Completable; import it.unimi.dsi.fastutil.objects.Object2ReferenceLinkedOpenHashMap; @@ -8,6 +7,7 @@ import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectBidirectionalIterator; +import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; import java.util.Arrays; import java.util.Objects; @@ -19,6 +19,8 @@ public class ItemHolder { + private static final VarHandle FUTURES_HANDLE = MethodHandles.arrayElementVarHandle(CompletableFuture[].class); + public static final IllegalStateException UNLOADED_EXCEPTION = new IllegalStateException("Not loaded"); private static final CompletableFuture UNLOADED_FUTURE = CompletableFuture.failedFuture(UNLOADED_EXCEPTION); @@ -58,11 +60,11 @@ protected void rehash(int newN) { }; private boolean dependencyDirty = false; - ItemHolder(ItemStatus initialStatus, K key, SimpleObjectPool> ticketSetPool) { + ItemHolder(ItemStatus initialStatus, K key) { this.unloadedStatus = Objects.requireNonNull(initialStatus); this.status = this.unloadedStatus; this.key = Objects.requireNonNull(key); - this.tickets = ticketSetPool.alloc(); + this.tickets = new TicketSet<>(this.unloadedStatus); ItemStatus[] allStatuses = initialStatus.getAllStatuses(); this.futures = new CompletableFuture[allStatuses.length]; @@ -74,11 +76,13 @@ protected void rehash(int newN) { VarHandle.fullFence(); } - private synchronized void createFutures() { + private void createFutures() { assertOpen(); - final ItemStatus targetStatus = this.getTargetStatus(); - for (int i = this.unloadedStatus.ordinal() + 1; i <= targetStatus.ordinal(); i++) { - this.futures[i] = this.futures[i] == UNLOADED_FUTURE ? new CompletableFuture<>() : this.futures[i]; + synchronized (this.futures) { + final ItemStatus targetStatus = this.getTargetStatus(); + for (int i = this.unloadedStatus.ordinal() + 1; i <= targetStatus.ordinal(); i++) { + this.futures[i] = this.futures[i] == UNLOADED_FUTURE ? new CompletableFuture<>() : this.futures[i]; + } } } @@ -87,7 +91,7 @@ private synchronized void createFutures() { * * @return the target status of this item, or null if no ticket is present */ - public synchronized ItemStatus getTargetStatus() { + public ItemStatus getTargetStatus() { return this.tickets.getTargetStatus(); } @@ -107,25 +111,27 @@ public boolean isUpgrading() { public void addTicket(ItemTicket ticket) { assertOpen(); + final boolean add = this.tickets.add(ticket); + if (!add) { + throw new IllegalStateException("Ticket already exists"); + } + createFutures(); + boolean needConsumption; synchronized (this) { - final boolean add = this.tickets.add(ticket); - if (!add) { - throw new IllegalStateException("Ticket already exists"); - } - createFutures(); + needConsumption = ticket.getTargetStatus().ordinal() <= this.getStatus().ordinal(); } - if (ticket.getTargetStatus().ordinal() <= this.getStatus().ordinal()) { + if (needConsumption) { ticket.consumeCallback(); } } - public synchronized void removeTicket(ItemTicket ticket) { + public void removeTicket(ItemTicket ticket) { assertOpen(); final boolean remove = this.tickets.remove(ticket); if (!remove) { throw new IllegalStateException("Ticket does not exist"); } - createFutures(); +// createFutures(); } public void submitOp(CompletionStage op) { @@ -142,6 +148,10 @@ public void subscribeOp(Completable op) { op.onErrorComplete().subscribe(this.busyRefCounter::decrementRefCount); } + BusyRefCounter busyRefCounter() { + return this.busyRefCounter; + } + public void submitUpgradeAction(CancellationSignaller signaller) { assertOpen(); final boolean success = this.runningUpgradeAction.compareAndSet(null, signaller); @@ -169,31 +179,38 @@ public void submitOpListener(Runnable runnable) { this.busyRefCounter.addListener(runnable); } - public void setStatus(ItemStatus status) { + public boolean setStatus(ItemStatus status) { assertOpen(); ItemTicket[] ticketsToFire = null; CompletableFuture futureToFire = null; synchronized (this) { final ItemStatus prevStatus = this.getStatus(); Assertions.assertTrue(status != prevStatus, "duplicate setStatus call"); - this.status = status; // this.statusHistory.add(Pair.of(status, System.currentTimeMillis())); final int compare = Integer.compare(status.ordinal(), prevStatus.ordinal()); if (compare < 0) { // status downgrade Assertions.assertTrue(prevStatus.getPrev() == status, "Invalid status downgrade"); - final ItemStatus targetStatus = this.getTargetStatus(); - for (int i = prevStatus.ordinal(); i < this.futures.length; i ++) { - if (i > targetStatus.ordinal()) { - this.futures[i].completeExceptionally(UNLOADED_EXCEPTION); - this.futures[i] = UNLOADED_FUTURE; - } else { - this.futures[i] = this.futures[i].isDone() ? new CompletableFuture<>() : this.futures[i]; + if (this.getTargetStatus().ordinal() > status.ordinal()) { + return false; + } + + this.status = status; + synchronized (this.futures) { + final ItemStatus targetStatus = this.getTargetStatus(); + for (int i = prevStatus.ordinal(); i < this.futures.length; i ++) { + if (i > targetStatus.ordinal()) { + this.futures[i].completeExceptionally(UNLOADED_EXCEPTION); + this.futures[i] = UNLOADED_FUTURE; + } else { + this.futures[i] = this.futures[i].isDone() ? new CompletableFuture<>() : this.futures[i]; + } } } } else if (compare > 0) { // status upgrade Assertions.assertTrue(prevStatus.getNext() == status, "Invalid status upgrade"); + this.status = status; final CompletableFuture future = this.futures[status.ordinal()]; Assertions.assertTrue(future != UNLOADED_FUTURE); @@ -210,6 +227,7 @@ public void setStatus(ItemStatus status) { if (futureToFire != null) { futureToFire.complete(null); } + return true; } public synchronized void setDependencies(ItemStatus status, KeyStatusPair[] dependencies) { @@ -260,9 +278,10 @@ public void setFlag(int flag) { this.flags.getAndUpdate(operand -> operand | flag); } - public void release(SimpleObjectPool> ticketSetPool) { + void release() { + assertOpen(); + this.tickets.assertEmpty(); setFlag(FLAG_REMOVED); - ticketSetPool.release(this.tickets); } public void addDependencyTicket(StatusAdvancingScheduler scheduler, K key, ItemStatus status, Runnable callback) { @@ -337,7 +356,11 @@ public void cleanupDependencies(StatusAdvancingScheduler scheduler } private void assertOpen() { - Assertions.assertTrue((this.getFlags() & FLAG_REMOVED) == 0); + Assertions.assertTrue(isOpen()); + } + + public boolean isOpen() { + return (this.getFlags() & FLAG_REMOVED) == 0; } private static class DependencyInfo { diff --git a/src/main/java/com/ishland/flowsched/scheduler/ItemTicket.java b/src/main/java/com/ishland/flowsched/scheduler/ItemTicket.java index f7a2c1f..b5e3808 100644 --- a/src/main/java/com/ishland/flowsched/scheduler/ItemTicket.java +++ b/src/main/java/com/ishland/flowsched/scheduler/ItemTicket.java @@ -1,14 +1,17 @@ package com.ishland.flowsched.scheduler; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; public class ItemTicket { + private static final AtomicReferenceFieldUpdater CALLBACK_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ItemTicket.class, Runnable.class, "callback"); + private final TicketType type; private final Object source; private final ItemStatus targetStatus; - private Runnable callback = null; - private int hash = 0; + private volatile Runnable callback = null; +// private int hash = 0; public ItemTicket(TicketType type, Object source, ItemStatus targetStatus, Runnable callback) { this.type = Objects.requireNonNull(type); @@ -30,14 +33,13 @@ public TicketType getType() { } public void consumeCallback() { - if (this.callback == null) return; - Runnable callback; - synchronized (this) { - callback = this.callback; - this.callback = null; - } + Runnable callback = CALLBACK_UPDATER.getAndSet(this, null); if (callback != null) { - callback.run(); + try { + callback.run(); + } catch (Throwable t) { + t.printStackTrace(); + } } } @@ -49,11 +51,11 @@ public boolean equals(Object o) { return type == that.type && Objects.equals(source, that.source) && Objects.equals(targetStatus, that.targetStatus); } - public boolean equalsAlternative(ItemTicket that) { - if (this == that) return true; - if (that == null) return false; - return type == that.type && Objects.equals(source, that.source); - } +// public boolean equalsAlternative(ItemTicket that) { +// if (this == that) return true; +// if (that == null) return false; +// return type == that.type && Objects.equals(source, that.source); +// } @Override public int hashCode() { @@ -66,18 +68,18 @@ public int hashCode() { return result; } - public int hashCodeAlternative() { - int hc = hash; - if (hc == 0) { - // inlined version of Objects.hash(type, source, targetStatus) - int result = 1; - - result = 31 * result + type.hashCode(); - result = 31 * result + source.hashCode(); - hc = hash = result; - } - return hc; - } +// public int hashCodeAlternative() { +// int hc = hash; +// if (hc == 0) { +// // inlined version of Objects.hash(type, source, targetStatus) +// int result = 1; +// +// result = 31 * result + type.hashCode(); +// result = 31 * result + source.hashCode(); +// hc = hash = result; +// } +// return hc; +// } public enum TicketType { EXTERNAL, diff --git a/src/main/java/com/ishland/flowsched/scheduler/StatusAdvancingScheduler.java b/src/main/java/com/ishland/flowsched/scheduler/StatusAdvancingScheduler.java index 6a79bc0..a469ce3 100644 --- a/src/main/java/com/ishland/flowsched/scheduler/StatusAdvancingScheduler.java +++ b/src/main/java/com/ishland/flowsched/scheduler/StatusAdvancingScheduler.java @@ -1,6 +1,5 @@ package com.ishland.flowsched.scheduler; -import com.ishland.flowsched.structs.SimpleObjectPool; import com.ishland.flowsched.util.Assertions; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Scheduler; @@ -43,12 +42,12 @@ protected void rehash(int newN) { } } }; - private final SimpleObjectPool> ticketSetPool = new SimpleObjectPool<>( - unused -> new TicketSet<>(getUnloadedStatus()), - TicketSet::clear, - TicketSet::clear, - 4096 - ); +// private final SimpleObjectPool> ticketSetPool = new SimpleObjectPool<>( +// unused -> new TicketSet<>(getUnloadedStatus()), +// TicketSet::clear, +// TicketSet::clear, +// 4096 +// ); protected Queue createPendingUpdatesQueue() { return new ConcurrentLinkedQueue<>(); @@ -137,7 +136,7 @@ public boolean tick() { if (current.equals(getUnloadedStatus())) { // System.out.println("Unloaded: " + key); this.onItemRemoval(holder); - holder.release(ticketSetPool); + holder.release(); final long lock = this.itemsLock.writeLock(); try { this.items.remove(key); @@ -152,9 +151,12 @@ public boolean tick() { if ((holder.getFlags() & ItemHolder.FLAG_BROKEN) != 0) { continue; // not allowed to upgrade } - holder.submitOp(CompletableFuture.runAsync(() -> advanceStatus0(holder, nextStatus, key), getExecutor())); + holder.submitOp(CompletableFuture.runAsync(() -> advanceStatus0(holder, nextStatus, key), getBackgroundExecutor())); } else { - holder.setStatus(nextStatus); + final boolean success = holder.setStatus(nextStatus); + if (!success) { + continue; // target status is modified to be higher + } holder.submitOp(CompletableFuture.runAsync(() -> downgradeStatus0(holder, current, nextStatus, key), getExecutor())); } } @@ -216,7 +218,6 @@ private void advanceStatus0(ItemHolder holder, ItemStatus { Assertions.assertTrue(holder.isBusy()); @@ -396,13 +397,18 @@ private ItemHolder addTicket0(K key, ItemTicket ItemHolder holder = this.getOrCreateHolder(key); synchronized (holder) { - if ((holder.getFlags() & ItemHolder.FLAG_REMOVED) != 0) { + if (!holder.isOpen()) { // holder got removed before we had chance to add a ticket to it, retry System.out.println(String.format("Retrying addTicket0(%s, %s)", key, ticket)); continue; } + holder.busyRefCounter().incrementRefCount(); + } + try { holder.addTicket(ticket); markDirty(key); + } finally { + holder.busyRefCounter().decrementRefCount(); } return holder; } @@ -413,7 +419,7 @@ private ItemHolder addTicket0(K key, ItemTicket } private ItemHolder createHolder(K k) { - final ItemHolder holder1 = new ItemHolder<>(this.getUnloadedStatus(), k, ticketSetPool); + final ItemHolder holder1 = new ItemHolder<>(this.getUnloadedStatus(), k); this.onItemCreation(holder1); VarHandle.fullFence(); return holder1; diff --git a/src/main/java/com/ishland/flowsched/scheduler/TicketSet.java b/src/main/java/com/ishland/flowsched/scheduler/TicketSet.java index dcc51ae..2e5c359 100644 --- a/src/main/java/com/ishland/flowsched/scheduler/TicketSet.java +++ b/src/main/java/com/ishland/flowsched/scheduler/TicketSet.java @@ -1,42 +1,27 @@ package com.ishland.flowsched.scheduler; import com.ishland.flowsched.util.Assertions; -import it.unimi.dsi.fastutil.Hash; -import it.unimi.dsi.fastutil.objects.ObjectOpenCustomHashSet; -import it.unimi.dsi.fastutil.objects.ObjectSet; import java.lang.invoke.VarHandle; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; public class TicketSet { +// private static final AtomicIntegerFieldUpdater targetStatusUpdater = AtomicIntegerFieldUpdater.newUpdater(TicketSet.class, "targetStatus"); + private final ItemStatus initialStatus; - private final ObjectOpenCustomHashSet>[] status2Tickets; - private int targetStatus = 0; + private final Set>[] status2Tickets; +// private volatile int targetStatus = 0; public TicketSet(ItemStatus initialStatus) { this.initialStatus = initialStatus; - this.targetStatus = initialStatus.ordinal(); +// this.targetStatus = initialStatus.ordinal(); ItemStatus[] allStatuses = initialStatus.getAllStatuses(); - this.status2Tickets = new ObjectOpenCustomHashSet[allStatuses.length]; + this.status2Tickets = new Set[allStatuses.length]; for (int i = 0; i < allStatuses.length; i++) { - this.status2Tickets[i] = new ObjectOpenCustomHashSet<>(new Hash.Strategy>() { - @Override - public int hashCode(ItemTicket o) { - return o.hashCodeAlternative(); - } - - @Override - public boolean equals(ItemTicket a, ItemTicket b) { - return a.equalsAlternative(b); - } - }) { - @Override - protected void rehash(int newN) { - if (n < newN) { - super.rehash(newN); - } - } - }; + this.status2Tickets[i] = Collections.newSetFromMap(new ConcurrentHashMap<>()); } VarHandle.fullFence(); } @@ -46,9 +31,10 @@ public boolean add(ItemTicket ticket) { final boolean added = this.status2Tickets[targetStatus.ordinal()].add(ticket); if (!added) return false; - if (this.targetStatus < targetStatus.ordinal()) { - this.targetStatus = targetStatus.ordinal(); - } +// if (this.targetStatus < targetStatus.ordinal()) { +// this.targetStatus = targetStatus.ordinal(); +// } +// targetStatusUpdater.accumulateAndGet(this, targetStatus.ordinal(), Math::max); return true; } @@ -57,35 +43,58 @@ public boolean remove(ItemTicket ticket) { final boolean removed = this.status2Tickets[targetStatus.ordinal()].remove(ticket); if (!removed) return false; - while (this.status2Tickets[this.targetStatus].isEmpty()) { - if ((-- this.targetStatus) <= 0) { - break; - } - } +// decreaseStatusAtomically(); return true; } +// private void decreaseStatusAtomically() { +// int currentStatus; +// int newStatus; +// do { +// currentStatus = this.targetStatus; +// newStatus = currentStatus; +// +// while (newStatus > 0 && this.status2Tickets[newStatus].isEmpty()) { +// newStatus--; +// } +// +// if (newStatus >= currentStatus) { +// break; // no need to update +// } +// +// } while (!targetStatusUpdater.compareAndSet(this, currentStatus, newStatus)); +// } + public ItemStatus getTargetStatus() { - return this.initialStatus.getAllStatuses()[this.targetStatus]; + return this.initialStatus.getAllStatuses()[this.computeTargetStatusSlow()]; } - public ObjectSet> getTicketsForStatus(ItemStatus status) { + public Set> getTicketsForStatus(ItemStatus status) { return this.status2Tickets[status.ordinal()]; } void clear() { - for (ObjectOpenCustomHashSet> tickets : status2Tickets) { + for (Set> tickets : status2Tickets) { tickets.clear(); } - this.targetStatus = initialStatus.ordinal(); +// this.targetStatus = initialStatus.ordinal(); VarHandle.fullFence(); } void assertEmpty() { - for (ObjectOpenCustomHashSet> tickets : status2Tickets) { + for (Set> tickets : status2Tickets) { Assertions.assertTrue(tickets.isEmpty()); } } + private int computeTargetStatusSlow() { + for (int i = this.status2Tickets.length - 1; i > 0; i--) { + if (!this.status2Tickets[i].isEmpty()) { + return i; + } + } + return 0; + } + }