Skip to content

Commit

Permalink
Have explicit capture set parameter for withResolver cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
natsukagami committed Aug 26, 2024
1 parent 79ca7fb commit 80d09d3
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 22 deletions.
17 changes: 10 additions & 7 deletions jvm/src/main/scala/PosixLikeIO/PIO.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package PosixLikeIO

import language.experimental.captureChecking
import caps.CapSet

import gears.async.Scheduler
import gears.async.default.given
import gears.async.{Async, Future}
Expand All @@ -17,7 +20,7 @@ import scala.util.{Failure, Success, Try}
import Future.Promise

object File:
extension (resolver: Future.Resolver[Int])
extension[Cap^] (resolver: Future.Resolver[Int, Cap])
private[File] def toCompletionHandler = new CompletionHandler[Integer, ByteBuffer] {
override def completed(result: Integer, attachment: ByteBuffer): Unit = resolver.resolve(result)
override def failed(e: Throwable, attachment: ByteBuffer): Unit = resolver.reject(e)
Expand All @@ -44,7 +47,7 @@ class File(val path: String) {
def read(buffer: ByteBuffer): Future[Int] =
assert(channel.isDefined)

Future.withResolver[Int]: resolver =>
Future.withResolver[Int, CapSet]: resolver =>
channel.get.read(
buffer,
0,
Expand All @@ -57,7 +60,7 @@ class File(val path: String) {
assert(size >= 0)

val buffer = ByteBuffer.allocate(size)
Future.withResolver[String]: resolver =>
Future.withResolver[String, CapSet]: resolver =>
channel.get.read(
buffer,
0,
Expand All @@ -72,7 +75,7 @@ class File(val path: String) {
def write(buffer: ByteBuffer): Future[Int] =
assert(channel.isDefined)

Future.withResolver[Int]: resolver =>
Future.withResolver[Int, CapSet]: resolver =>
channel.get.write(
buffer,
0,
Expand Down Expand Up @@ -114,7 +117,7 @@ class SocketUDP() {
def send(data: ByteBuffer, address: String, port: Int): Future[Unit] =
assert(socket.isDefined)

Future.withResolver: resolver =>
Future.withResolver[Unit, CapSet]: resolver =>
resolver.spawn:
val packet: DatagramPacket =
new DatagramPacket(data.array(), data.limit(), InetAddress.getByName(address), port)
Expand All @@ -123,7 +126,7 @@ class SocketUDP() {
def receive(): Future[DatagramPacket] =
assert(socket.isDefined)

Future.withResolver: resolver =>
Future.withResolver[DatagramPacket, CapSet]: resolver =>
resolver.spawn:
val buffer = Array.fill[Byte](10 * 1024)(0)
val packet: DatagramPacket = DatagramPacket(buffer, 10 * 1024)
Expand All @@ -138,7 +141,7 @@ class SocketUDP() {
}

object SocketUDP:
extension [T](resolver: Future.Resolver[T])
extension [T, Cap^](resolver: Future.Resolver[T, Cap])
private[SocketUDP] inline def spawn(body: => T)(using s: Scheduler) =
s.execute(() =>
resolver.complete(Try(body).recover { case _: InterruptedException =>
Expand Down
3 changes: 1 addition & 2 deletions shared/src/main/scala/async/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,7 @@ object Async:
* @see
* [[Async$.select Async.select]] where [[SelectCase]] is used.
*/
/* TODO: inline after cc-ing channels */
def ~~>[U](_f: T => U): SelectCase[U]^{_src, _f} = _src.handle(_f)
inline def ~~>[U](_f: T => U): SelectCase[U]^{_src, _f} = _src.handle(_f)

/** Race a list of sources with the corresponding handler functions, once an item has come back. Like [[race]],
* [[select]] guarantees exactly one of the sources are polled. Unlike [[transformValuesWith]], the handler in
Expand Down
2 changes: 1 addition & 1 deletion shared/src/main/scala/async/ScalaConverters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object ScalaConverters:
* [[Future]] will *not* clean up the pending job when cancelled.
*/
def asGears(using ExecutionContext): Future[T]^{fut} =
Future.withResolver[T]: resolver =>
Future.withResolver[T, caps.CapSet]: resolver =>
fut.andThen(result => resolver.complete(result))

extension [T](fut: Future[T]^)
Expand Down
20 changes: 11 additions & 9 deletions shared/src/main/scala/async/futures.scala
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ object Future:
* fail with the failure that was returned first.
*/
def zip[U](f2: Future[U]^): Future[(T, U)]^{f1, f2} =
Future.withResolver: r =>
Future.withResolver[(T, U), caps.CapSet^{f1, f2}]: r =>
Async
.either(f1, f2)
.onComplete(Listener { (v, _) =>
Expand Down Expand Up @@ -246,7 +246,7 @@ object Future:
*/
def orWithCancel(f2: Future[T]^): Future[T]^{f1, f2} = orImpl(true)(f2)

inline def orImpl(inline withCancel: Boolean)(f2: Future[T]^): Future[T]^{f1, f2} = Future.withResolver: r =>
inline def orImpl(inline withCancel: Boolean)(f2: Future[T]^): Future[T]^{f1, f2} = Future.withResolver[T, caps.CapSet^{f1, f2}]: r =>
Async
.raceWithOrigin(f1, f2)
.onComplete(Listener { case ((v, which), _) =>
Expand Down Expand Up @@ -288,7 +288,7 @@ object Future:
/** The group of handlers to be used in [[withResolver]]. As a Future is completed only once, only one of
* resolve/reject/complete may be used and only once.
*/
trait Resolver[-T]:
trait Resolver[-T, Cap^]:
/** Complete the future with a data item successfully */
def resolve(item: T): Unit = complete(Success(item))

Expand All @@ -305,7 +305,7 @@ object Future:
* may be used. The handler should eventually complete the Future using one of complete/resolve/reject*. The
* default handler is set up to [[rejectAsCancelled]] immediately.
*/
def onCancel(handler: () => Unit): Unit
def onCancel(handler: (() -> Unit)^{Cap^}): Unit
end Resolver

/** Create a promise that may be completed asynchronously using external means.
Expand All @@ -315,16 +315,18 @@ object Future:
*
* If the external operation supports cancellation, the body can register one handler using [[Resolver.onCancel]].
*/
def withResolver[T](body: Resolver[T] => Unit): Future[T] =
val future = new CoreFuture[T] with Resolver[T] with Promise[T]:
@volatile var cancelHandle: () -> Unit = () => rejectAsCancelled()
override def onCancel(handler: () => Unit): Unit = cancelHandle = caps.unsafe.unsafeAssumePure(handler)
def withResolver[T, Cap^](body: Resolver[T, Cap]^{Cap^} => Unit): Future[T]^{Cap^} =
val future: (CoreFuture[T] & Resolver[T, Cap] & Promise[T])^{Cap^} = new CoreFuture[T] with Resolver[T, Cap] with Promise[T]:
// TODO: undo this once bug is fixed
@volatile var cancelHandle: (() -> Unit) = () => rejectAsCancelled()
override def onCancel(handler: (() -> Unit)^{Cap^}): Unit =
cancelHandle = /* TODO remove */ caps.unsafe.unsafeAssumePure(handler)
override def complete(result: Try[T]): Unit = super.complete(result)

override def cancel(): Unit =
if setCancelled() then cancelHandle()
end future
body(future: Resolver[T])
body(future)
future
end withResolver

Expand Down
6 changes: 3 additions & 3 deletions shared/src/test/scala/FutureBehavior.scala
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ class FutureBehavior extends munit.FunSuite {
}

test("Future.withResolver cancel handler is not run after being completed") {
val num = AtomicInteger(0)
val fut = Future.withResolver[Int]: r =>
val num: AtomicInteger^ = AtomicInteger(0)
val fut = Future.withResolver[Int, caps.CapSet^{num}]: r =>
r.onCancel { () => num.incrementAndGet() }
r.resolve(1)
fut.cancel()
Expand All @@ -343,7 +343,7 @@ class FutureBehavior extends munit.FunSuite {

test("Future.withResolver is only completed after handler decides") {
val prom = Future.Promise[Unit]()
val fut = Future.withResolver[Unit]: r =>
val fut = Future.withResolver[Unit, caps.CapSet]: r =>
r.onCancel(() => prom.onComplete(Listener { (_, _) => r.rejectAsCancelled() }))

assert(fut.poll().isEmpty)
Expand Down

0 comments on commit 80d09d3

Please sign in to comment.