From f7eb38ee13c4a889dc9b2ca0149eb55dd70af97f Mon Sep 17 00:00:00 2001 From: Caleb Hulbert Date: Tue, 9 Jul 2024 16:10:33 -0400 Subject: [PATCH] refactor(BREAKING): Back [Tasks] with coroutines instead of javafx Tasks. Really just meant for coroutine compatibility with Java. In most cases in Kotlin code, it is sufficient to use coroutines/async/await directly --- .../org/janelia/saalfeldlab/fx/Tasks.kt | 256 +++--------------- .../saalfeldlab/fx/midi/MidiActionSet.kt | 4 +- .../saalfeldlab/fx/ui/MatchSelection.kt | 4 +- .../saalfeldlab/fx/undo/UndoFromEvents.kt | 2 +- .../org/janelia/saalfeldlab/fx/TasksTest.kt | 152 ++++------- 5 files changed, 106 insertions(+), 312 deletions(-) diff --git a/src/main/kotlin/org/janelia/saalfeldlab/fx/Tasks.kt b/src/main/kotlin/org/janelia/saalfeldlab/fx/Tasks.kt index 4fcbf45..060f5ef 100644 --- a/src/main/kotlin/org/janelia/saalfeldlab/fx/Tasks.kt +++ b/src/main/kotlin/org/janelia/saalfeldlab/fx/Tasks.kt @@ -1,264 +1,96 @@ package org.janelia.saalfeldlab.fx -import com.google.common.util.concurrent.ThreadFactoryBuilder import io.github.oshai.kotlinlogging.KotlinLogging -import javafx.beans.value.ChangeListener import javafx.concurrent.Task -import javafx.concurrent.Worker import javafx.concurrent.Worker.State.* -import javafx.concurrent.WorkerStateEvent -import javafx.event.EventHandler -import org.janelia.saalfeldlab.fx.util.InvokeOnJavaFXApplicationThread -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors -import java.util.concurrent.ThreadFactory +import kotlinx.coroutines.* import java.util.function.BiConsumer import java.util.function.Consumer -import java.util.function.Function +import java.util.function.Supplier -/** - * Utility class for workign with [UtilityTask] - */ class Tasks private constructor() { companion object { @JvmSynthetic - fun createTask(call: (UtilityTask) -> T): UtilityTask { - return UtilityTask(call) + fun createTask(call: suspend () -> T): UtilityTask { + return UtilityTask(CoroutineScope(Dispatchers.Default)) { call() } } @JvmStatic - fun createTask(call: Function, T>): UtilityTask { - return createTask { call.apply(it) } + fun createTask(call: Supplier): UtilityTask { + return createTask { call.get() } } @JvmStatic - fun createTask(call: Consumer>): UtilityTask { - return createTask { call.accept(it) } + fun createTask(call: Runnable): UtilityTask { + return createTask { call.run() } } } } -private val THREAD_FACTORY: ThreadFactory = ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("task-thread-%d") - .build() -private val TASK_SERVICE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() - 1, THREAD_FACTORY) - -/** - * Convenience wrapper class around [Task] - * - * @param V type of super class Task - * @property onCall called during [Task.call], but wrapped with exception handling - * @constructor Create empty Utility task - */ -class UtilityTask(private val onCall: (UtilityTask) -> V) : Task() { - - private var executorService : ExecutorService = TASK_SERVICE - private var onFailedSet = false +@Suppress("OPT_IN_USAGE") +class UtilityTask internal constructor( + private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default), + private val block: suspend CoroutineScope.() -> V +) : Deferred by scope.async(block = block) { companion object { - private val LOG = KotlinLogging.logger { } - } - - override fun call(): V? { - try { - /* If no `onEnd/onFail` has been set, then we should listen for thrown exceptions and throw them */ - if (!onFailedSet) setDefaultOnFailed() - return onCall(this) - } catch (e: Exception) { - if (isCancelled) { - LOG.trace(e) { "Task Cancelled (cancelled=$isCancelled)" } - return null - } - throw RuntimeException(e) - } + private val LOG = KotlinLogging.logger { } } - public override fun updateValue(value: V) { - super.updateValue(value) - } - - private fun setDefaultOnFailed() { - InvokeOnJavaFXApplicationThread { - this.onFailed { _, task -> LOG.error(task.exception) {"Task Failed"} } + @JvmSynthetic + fun onSuccess(onSuccess: (V) -> Unit) = apply { + invokeOnCompletion { cause -> + cause ?: onSuccess(getCompleted()) } } - /** - * Builder-style function to set [SUCCEEDED] callback. - * - * @param append flag to determine behavior if an existing `onSuccess` callback is present: - * - if `true`, the current callback will be called prior to this `consumer` being called - * - if `false`, the prior callback will be removed and never called. - * - if `null`, this will throw a runtime exception if an existing callback is present. - * - This is meant to help unintended overrides of existing callbacks when `append` is not explicitly specified - * @param consumer to be called when [SUCCEEDED] - * @return this - */ - @JvmSynthetic - fun onSuccess(append: Boolean? = null, consumer: (WorkerStateEvent, UtilityTask) -> Unit): UtilityTask { - val appendCallbacks = onSucceeded?.appendCallbacks(append, consumer) - val consumerEvent = EventHandler { event -> consumer(event, this) } - setOnSucceeded(appendCallbacks ?: consumerEvent) - return this + fun onSuccess(onSuccess: Consumer) = apply { + onSuccess { onSuccess.accept(it) } } - /** - * Builder-style function to set [CANCELLED] callback. - * - * @param append flag to determine behavior if an existing `onCancelled` callback is present: - * - if `true`, the current callback will be called prior to this `consumer` being called - * - if `false`, the prior callback will be removed and never called. - * - if `null`, this will throw a runtime exception if an existing callback is present. - * - This is meant to help unintended overrides of existing callbacks when `append` is not explicitly specified - * @param consumer to be called when [CANCELLED] - * @return this - */ @JvmSynthetic - fun onCancelled(append: Boolean? = null, consumer: (WorkerStateEvent, UtilityTask) -> Unit): UtilityTask { - val appendCallbacks = onCancelled?.appendCallbacks(append, consumer) - val consumerEvent = EventHandler { event -> consumer(event, this) } - setOnCancelled(appendCallbacks ?: consumerEvent) - return this + fun onCancelled(onCancelled: (CancellationException) -> Unit) = apply { + invokeOnCompletion { cause -> + (cause as? CancellationException)?.let { onCancelled(it) } + } } - /** - * Builder-style function to set [FAILED] callback. - * - * @param append flag to determine behavior if an existing `onFailed` callback is present: - * - if `true`, the current callback will be called prior to this `consumer` being called - * - if `false`, the prior callback will be removed and never called. - * - if `null`, this will throw a runtime exception if an existing callback is present. - * - This is meant to help unintended overrides of existing callbacks when `append` is not explicitly specified - * @param consumer to be called when [FAILED] - * @return this - */ - @JvmSynthetic - fun onFailed(append: Boolean? = null, consumer: (WorkerStateEvent, UtilityTask) -> Unit): UtilityTask { - this.onFailedSet = true - val eventHandler = onFailed?.appendCallbacks(append, consumer) ?: EventHandler { event -> consumer(event, this) } - this.setOnFailed(eventHandler) - return this + fun onCancelled(onCancelled: Consumer) = apply { + onCancelled { onCancelled.accept(it) } } - - private var onEndListener: ChangeListener? = null - - /** - * Builder-style function to set when the task ends, either by [SUCCEEDED], [CANCELLED], or [FAILED]. - * - * @param append flag to determine behavior if an existing `onEnd` callback is present: - * - if `true`, the current callback will be called prior to this `consumer` being called - * - if `false`, the prior callback will be removed and never called. - * - if `null`, this will throw a runtime exception if an existing callback is present. - * - This is meant to help unintended overrides of existing callbacks when `append` is not explicitly specified - * @param consumer to be called when task ends - * @return this - */ @JvmSynthetic - fun onEnd(append: Boolean? = null, consumer: (UtilityTask) -> Unit): UtilityTask { - //TODO Caleb: Consider renaming `onEnd` to `finally` since this is trigger on end for ANY reason, even if an - // Exception was thrown. Or Maybe a separate `finally` which does what this currently does, and then change `onEnd` - // to NOT trigger if an excpetion occures (that isn't handled by the exception handler) - onEndListener = onEndListener?.let { oldListener -> - stateProperty().removeListener(oldListener) - if (append == null) - throw TaskStateCallbackOverrideException("Overriding existing handler; If intentional, pass `false` for `append`") - if (append) { - ChangeListener { obs, oldv, newv -> - when (newv) { - SUCCEEDED, CANCELLED, FAILED -> { - oldListener.changed(obs, oldv, newv) - consumer(this) - } - - else -> Unit - } - } - } else null - } ?: ChangeListener { _, _, newv -> - when (newv) { - SUCCEEDED, CANCELLED, FAILED -> consumer(this) - else -> Unit + fun onFailed(onFailed: (Throwable) -> Unit) = apply { + invokeOnCompletion { cause -> + when (cause) { + null, is CancellationException -> Unit + else -> onFailed(cause) } } - this.stateProperty().addListener(onEndListener) - return this - } - - - /** - * - * @see [onSuccess] - */ - @JvmOverloads - fun onSuccess(append: Boolean? = null, consumer: BiConsumer>): UtilityTask { - return onSuccess(append) { e, t -> consumer.accept(e, t) } } - /** - * - * @see [onCancelled] - */ - @JvmOverloads - fun onCancelled(append: Boolean? = null, consumer: BiConsumer>): UtilityTask { - return onCancelled(append) { e, t -> consumer.accept(e, t) } + fun onFailed(onFailed: Consumer) = apply { + onFailed { onFailed.accept(it) } } - /** - * - * @see [onFailed] - */@JvmOverloads - fun onFailed(append: Boolean? = null, consumer: BiConsumer>): UtilityTask { - return onFailed(append) { e, t -> consumer.accept(e, t) } - } - /** - * - * @see [onEnd] - */ - @JvmOverloads - fun onEnd(append: Boolean? = null, consumer: Consumer>): UtilityTask { - return onEnd(append) { t -> consumer.accept(t) } + @JvmSynthetic + fun onEnd(onEnd: (V?, Throwable?) -> Unit) = apply { + invokeOnCompletion { cause -> + val (value, error) = cause?.let { null to it } ?: (getCompleted() to null) + onEnd(value, error) + } } - /** - * Submit this task to the [executorService]. - * - * @param executorService to execute this task on. - * @return this task - */ - @JvmOverloads - fun submit(executorService: ExecutorService = this.executorService) : UtilityTask { - this.executorService = executorService - this.executorService.submit(this) - return this + fun onEnd(onEnd: BiConsumer) = apply { + onEnd { result, cause -> onEnd.accept(result, cause) } } - /** - * Submit this task to the [executorService], and block while waiting for it to return. - * This will return after the task completes, but possibbly BEFORE the [onSuccess]/[onEnd] call finish. - * - * @param executorService to execute this task on. - * @return the result of this task, blocking if not yet done. - */ - @JvmOverloads - fun submitAndWait(executorService: ExecutorService = this.executorService): V { - this.executorService = executorService - this.executorService.submit(this) - return this.get() - } + fun get() = runBlocking { await() } - private fun EventHandler.appendCallbacks(append: Boolean? = false, consumer: (WorkerStateEvent, UtilityTask) -> Unit): EventHandler? { - if (append == null) throw TaskStateCallbackOverrideException("Overriding existing handler; If intentional, pass `false` for `append`") - if (!append) return null - return EventHandler { event -> - this.handle(event) - consumer(event, this@UtilityTask) - } + fun wait() = apply { + runBlocking { join() } } - - private class TaskStateCallbackOverrideException(override val message: String?) : RuntimeException(message) } diff --git a/src/main/kotlin/org/janelia/saalfeldlab/fx/midi/MidiActionSet.kt b/src/main/kotlin/org/janelia/saalfeldlab/fx/midi/MidiActionSet.kt index bee0836..fb9efee 100644 --- a/src/main/kotlin/org/janelia/saalfeldlab/fx/midi/MidiActionSet.kt +++ b/src/main/kotlin/org/janelia/saalfeldlab/fx/midi/MidiActionSet.kt @@ -80,8 +80,8 @@ abstract class MidiAction(eventType: EventType, val device: override val logger: KLogger by lazy { val simpleName = this::class.simpleName?.let { ".$it" } ?: "" - val name = ".${this.name ?: "event-${eventType.name}"}" - KotlinLogging.logger("saalfx.action.midi$simpleName$name.$handle") + val finalName = ".${this.name ?: "event-${eventType.name}"}" + KotlinLogging.logger("saalfx.action.midi$simpleName$finalName.$handle") } init { diff --git a/src/main/kotlin/org/janelia/saalfeldlab/fx/ui/MatchSelection.kt b/src/main/kotlin/org/janelia/saalfeldlab/fx/ui/MatchSelection.kt index 253f49f..fb1fdff 100644 --- a/src/main/kotlin/org/janelia/saalfeldlab/fx/ui/MatchSelection.kt +++ b/src/main/kotlin/org/janelia/saalfeldlab/fx/ui/MatchSelection.kt @@ -206,13 +206,13 @@ class MatchSelection( companion object { fun registerStyleSheet(styleable : Scene) { - MatchSelection.javaClass.getResource("matcher.css")?.toExternalForm()?.let { css -> + MatchSelection::class.java.getResource("matcher.css")?.toExternalForm()?.let { css -> styleable.stylesheets.add(css) } } fun registerStyleSheet(styleable : Parent) { - MatchSelection.javaClass.getResource("matcher.css")?.toExternalForm()?.let { css -> + MatchSelection::class.java.getResource("matcher.css")?.toExternalForm()?.let { css -> styleable.stylesheets.add(css) } } diff --git a/src/main/kotlin/org/janelia/saalfeldlab/fx/undo/UndoFromEvents.kt b/src/main/kotlin/org/janelia/saalfeldlab/fx/undo/UndoFromEvents.kt index 142b0b0..91755e8 100644 --- a/src/main/kotlin/org/janelia/saalfeldlab/fx/undo/UndoFromEvents.kt +++ b/src/main/kotlin/org/janelia/saalfeldlab/fx/undo/UndoFromEvents.kt @@ -145,7 +145,7 @@ class UndoFromEvents( } this.currentEventIndex.set(events.size - 1) nodes.reverse() - InvokeOnJavaFXApplicationThread.invoke { this.eventBox.children.setAll(nodes) } + InvokeOnJavaFXApplicationThread.invoke { this@UndoFromEvents.eventBox.children.setAll(nodes) } } companion object { diff --git a/src/test/kotlin/org/janelia/saalfeldlab/fx/TasksTest.kt b/src/test/kotlin/org/janelia/saalfeldlab/fx/TasksTest.kt index a9d688c..c8ea271 100644 --- a/src/test/kotlin/org/janelia/saalfeldlab/fx/TasksTest.kt +++ b/src/test/kotlin/org/janelia/saalfeldlab/fx/TasksTest.kt @@ -8,13 +8,13 @@ import javafx.scene.control.ListView import javafx.scene.input.MouseEvent import javafx.scene.layout.Pane import javafx.stage.Stage +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking import org.janelia.saalfeldlab.fx.util.InvokeOnJavaFXApplicationThread import org.junit.Assert import org.testfx.framework.junit.ApplicationTest import org.testfx.util.WaitForAsyncUtils import java.io.PrintStream -import java.time.LocalDateTime -import java.time.temporal.ChronoUnit import kotlin.coroutines.cancellation.CancellationException import kotlin.test.BeforeTest import kotlin.test.Test @@ -49,8 +49,7 @@ class TasksTest : ApplicationTest() { fun `onSuccess runs when successful`() { val testText = "Single onSuccess Test" Tasks.createTask { testText } - .onSuccess { _, t -> list.items.add(t.value) } - .submit() + .onSuccess { list.items.add(it) } WaitForAsyncUtils.waitForFxEvents() @@ -63,15 +62,16 @@ class TasksTest : ApplicationTest() { fun `onEnd and OnSuccess run when successful`() { val endOnSuccessText = "Single onEnd Test, expecting success" val task = Tasks.createTask { endOnSuccessText } - .onSuccess { _, t -> list.items.add(t.value) } - .onEnd { t -> list.items.add(t.value) } - .submit() + .onSuccess { list.items.add(it) } + .onEnd { result, _ -> list.items.add(result!!) } - WaitForAsyncUtils.waitForFxEvents() + val result = runBlocking { + task.await() + } val items = list.items - Assert.assertTrue(task.isDone) - Assert.assertEquals(endOnSuccessText, task.get()) + Assert.assertTrue(task.isCompleted) + Assert.assertEquals(endOnSuccessText, result) Assert.assertArrayEquals(arrayOf(endOnSuccessText, endOnSuccessText), items.toTypedArray()) } @@ -79,11 +79,9 @@ class TasksTest : ApplicationTest() { fun `onEnd and onSuccess run after blocking when successful`() { val endOnSuccessText = "Single onEnd Test, expecting success" val result = Tasks.createTask { endOnSuccessText } - .onSuccess { _, t -> list.items.add(t.value) } - .onEnd { t -> list.items.add(t.value) } - .submitAndWait() - - WaitForAsyncUtils.waitForFxEvents() + .onSuccess { list.items.add(it) } + .onEnd { result, _ -> list.items.add(result!!) } + .get() val items = list.items Assert.assertEquals(endOnSuccessText, result) @@ -96,17 +94,15 @@ class TasksTest : ApplicationTest() { val textWithoutCancel = "Single onEnd Test, expecting to never see this" val textWithCancel = "Single onEnd Test, expecting cancel" var canceled = false - val maxTime = LocalDateTime.now().plus(5, ChronoUnit.SECONDS) val task = Tasks.createTask { /* waiting for the task to be canceled. If too long, we have failed. */ - while (!canceled || LocalDateTime.now().isBefore(maxTime)) { - sleep(20) + while (!canceled) { + delay(5000) } textWithoutCancel } - .onSuccess { _, t -> list.items.add(t.get()) } - .onEnd { list.items.add(textWithCancel) } - .submit() + .onSuccess { list.items.add(it) } + .onEnd { _, _ -> list.items.add(textWithCancel) } task.cancel() canceled = true @@ -140,24 +136,18 @@ class TasksTest : ApplicationTest() { val task: UtilityTask<*> try { task = Tasks.createTask { throw ExceptionTestException() } - .onSuccess { _, t -> list.items.add(t.get()) } - .onEnd { list.items.add(textWithFailure) } - .submit() - WaitForAsyncUtils.waitForFxEvents() + .onSuccess { list.items.add(it) } + .onEnd { _, _ -> list.items.add(textWithFailure) } + .onFailed { assertIs(it) } + .wait() } finally { System.setOut(stdout) System.setErr(stderr) } - - - Assert.assertFalse(task.isCancelled) - Assert.assertTrue(task.isDone) + Assert.assertTrue(task.isCancelled) + Assert.assertTrue(task.isCompleted) Assert.assertArrayEquals(arrayOf(textWithFailure), items.toTypedArray()) - - InvokeOnJavaFXApplicationThread.invokeAndWait { - assert(task.exception.cause is ExceptionTestException) - } } @Test @@ -167,51 +157,47 @@ class TasksTest : ApplicationTest() { val textWithFailure = "Single onFailure Test, expecting failure" /* Intentionally trigger failure, with custom onFailed */ val task = Tasks.createTask { throw ExceptionTestException() } - .onSuccess { _, t -> list.items.add(t.get()) } - .onEnd { list.items.add(textWithEnd) } - .onFailed { _, _ -> list.items.add(textWithFailure) } - .submit() + .onSuccess { list.items.add(it) } + .onEnd { _, _ -> list.items.add(textWithEnd) } + .onFailed { list.items.add(textWithFailure) } + .onFailed { assertIs(it) } + .wait() - WaitForAsyncUtils.waitForFxEvents() - - Assert.assertFalse(task.isCancelled) - Assert.assertTrue(task.isDone) + Assert.assertTrue(task.isCancelled) + Assert.assertTrue(task.isCompleted) Assert.assertArrayEquals(arrayOf(textWithEnd, textWithFailure), items.toTypedArray()) - - InvokeOnJavaFXApplicationThread.invokeAndWait { - assert(task.exception.cause is ExceptionTestException) - } } @Test - fun `appending callbacks run in order when successful`() { + fun `multiple callbacks run in order when successful`() { var success = 0 var end = 0 Tasks.createTask { "asdf" } - .onSuccess { _, _ -> success += 1 } - .onSuccess(true) { _, _ -> success *= 3 } - .onEnd { end += 1 } - .onEnd(true) { end *= 3 } - .submitAndWait() + .onSuccess { success += 1 } + .onSuccess { success *= 3 } + .onEnd { _, _ -> end += 1 } + .onEnd { _, _ -> end *= 3 } + .wait() - WaitForAsyncUtils.waitForFxEvents() assertEquals(3, success) assertEquals(3, end) var cancelled = 0 Tasks.createTask { "asdf" - Thread.sleep(100) + delay(1000) } - .onSuccess { _, _ -> success += 1 } - .onSuccess(true) { _, _ -> success *= 3 } - .onCancelled { _, _ -> cancelled += 1 } - .onCancelled(true) { _, _ -> cancelled *= 3 } - .onEnd { end += 1 } - .onEnd(true) { end *= 3 } - .submit().also { it.cancel() } + .onSuccess { success += 1 } + .onSuccess { success *= 3 } + .onCancelled { cancelled += 1 } + .onCancelled { cancelled *= 3 } + .onEnd { _, _ -> end += 1 } + .onEnd { _, _ -> end *= 3 } + .also { + it.cancel() + it.wait() + } - WaitForAsyncUtils.waitForFxEvents() assertEquals(3, success) assertEquals(3, cancelled) assertEquals(12, end) @@ -221,15 +207,15 @@ class TasksTest : ApplicationTest() { "asdf" throw ExceptionTestException() } - .onSuccess { _, _ -> success += 1 } - .onSuccess(true) { _, _ -> success *= 3 } - .onCancelled { _, _ -> cancelled += 1 } - .onCancelled(true) { _, _ -> cancelled *= 3 } - .onEnd { end += 1 } - .onEnd(true) { end *= 3 } - .onFailed { _, _ -> failed += 1} - .onFailed(true) { _, _ -> failed *= 3} - .submit() + .onSuccess { success += 1 } + .onSuccess { success *= 3 } + .onCancelled { cancelled += 1 } + .onCancelled { cancelled *= 3 } + .onEnd { _, _ -> end += 1 } + .onEnd { _, _ -> end *= 3 } + .onFailed { failed += 1 } + .onFailed { failed *= 3 } + .wait() WaitForAsyncUtils.waitForFxEvents() assertEquals(3, success) @@ -238,32 +224,8 @@ class TasksTest : ApplicationTest() { assertEquals(39, end) } - @Test - fun `append callbacks works as expected`() { - Assert.assertThrows(RuntimeException::class.java) { - Tasks.createTask { "asdf" } - .onSuccess { _, _ -> } - .onSuccess { _, _ -> } - } - Assert.assertThrows(RuntimeException::class.java) { - Tasks.createTask { "asdf" } - .onEnd { } - .onEnd { } - } - Assert.assertThrows(RuntimeException::class.java) { - Tasks.createTask { "asdf" } - .onFailed{ _, _ -> } - .onFailed{ _, _ -> } - } - Assert.assertThrows(RuntimeException::class.java) { - Tasks.createTask { "asdf" } - .onCancelled { _, _ -> } - .onCancelled { _, _ -> } - } - } - companion object { - private val LOG = KotlinLogging.logger { } + private val LOG = KotlinLogging.logger { } const val SCENE_WIDTH = 800.0