Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async primitives #85

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
34 changes: 23 additions & 11 deletions shared/src/main/scala/async/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,33 @@ object Async:
def group[T](body: Async.Spawn ?=> T)(using Async): T =
withNewCompletionGroup(CompletionGroup().link())(body)

private def cancelAndWaitGroup(group: CompletionGroup)(using async: Async) =
val completionAsync =
if CompletionGroup.Unlinked == async.group
then async
else async.withGroup(CompletionGroup.Unlinked)
group.cancel()
group.waitCompletion()(using completionAsync)

/** Runs a body within another completion group. When the body returns, the group is cancelled and its completion
* awaited with the `Unlinked` group.
*/
private[async] def withNewCompletionGroup[T](group: CompletionGroup)(body: Async.Spawn ?=> T)(using
async: Async
): T =
val completionAsync =
if CompletionGroup.Unlinked == async.group
then async
else async.withGroup(CompletionGroup.Unlinked)

try body(using async.withGroup(group))
finally
group.cancel()
group.waitCompletion()(using completionAsync)
finally cancelAndWaitGroup(group)(using async)

/** A Resource that grants access to the [[Spawn]] capability. On cleanup, every spawned [[Future]] is cancelled and
* awaited, similar to [[Async.group]].
*
* Note that the [[Spawn]] from the resource must not be used for awaiting after allocation.
*/
val spawning = new Resource[Spawn]:
override def use[V](body: Spawn => V)(using Async): V = group(spawn ?=> body(spawn))
override def allocated(using allocAsync: Async): (Spawn, (Async) ?=> Unit) =
val group = CompletionGroup() // not linked to allocAsync's group because it would not unlink itself
(allocAsync.withGroup(group), closeAsync ?=> cancelAndWaitGroup(group)(using closeAsync))

/** An asynchronous data source. Sources can be persistent or ephemeral. A persistent source will always pass same
* data to calls of [[Source!.poll]] and [[Source!.onComplete]]. An ephemeral source can pass new data in every call.
Expand Down Expand Up @@ -318,14 +330,14 @@ object Async:
def acquire() =
if found then false
else
acquireLock()
numberedLock.lock()
if found then
releaseLock()
numberedLock.unlock()
// no cleanup needed here, since we have done this by an earlier `complete` or `lockNext`
false
else true
def release() =
releaseLock()
numberedLock.unlock()

var found = false

Expand Down
8 changes: 3 additions & 5 deletions shared/src/main/scala/async/Listener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gears.async

import gears.async.Async.Source

import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
import scala.annotation.tailrec

Expand Down Expand Up @@ -100,11 +101,8 @@ object Listener:
trait NumberedLock:
import NumberedLock._

val number = listenerNumber.getAndIncrement()
private val lock0 = ReentrantLock()

protected def acquireLock() = lock0.lock()
protected def releaseLock() = lock0.unlock()
protected val number = listenerNumber.getAndIncrement()
protected val numberedLock: Lock = ReentrantLock()

object NumberedLock:
private val listenerNumber = java.util.concurrent.atomic.AtomicLong()
166 changes: 166 additions & 0 deletions shared/src/main/scala/async/Resource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package gears.async

/** A Resource wraps allocation to some asynchronously allocatable and releasable resource and grants access to it. It
* allows both structured access (similar to [[scala.util.Using]]) and unstructured allocation.
*/
trait Resource[+T]:
self =>

/** Run a structured action on the resource. It is allocated and released automatically.
*
* @param body
* the action to run on the resource
* @return
* the result of [[body]]
*/
def use[V](body: T => V)(using Async): V =
val res = allocated
try body(res._1)
finally res._2

/** Allocate the resource and leak it. **Use with caution**. The programmer is responsible for closing the resource
* with the returned handle.
*
* @return
* the allocated access to the resource data as well as a handle to close it
*/
def allocated(using Async): (T, Async ?=> Unit)

/** Create a derived resource that inherits the close operation.
*
* @param fn
* the function used to transform the resource data. It is only run on allocation/use.
* @return
* the transformed resource used to access the mapped resource data
*/
def map[U](fn: T => Async ?=> U): Resource[U] = new Resource[U]:
override def use[V](body: U => V)(using Async): V = self.use(t => body(fn(t)))
override def allocated(using Async): (U, (Async) ?=> Unit) =
val res = self.allocated
try
(fn(res._1), res._2)
catch
e =>
res._2
throw e
override def map[Q](fn2: U => (Async) ?=> Q): Resource[Q] = self.map(t => fn2(fn(t)))

/** Create a derived resource that creates a inner resource from the resource data. The inner resource will be
* acquired simultaneously, thus it can both transform the resource data and add a new cleanup action.
*
* @param fn
* a function that creates an inner resource
* @return
* the transformed resource that provides the two-levels-in-one access
*/
def flatMap[U](fn: T => Async ?=> Resource[U]): Resource[U] = new Resource[U]:
override def use[V](body: U => V)(using Async): V = self.use(t => fn(t).use(body))
override def allocated(using Async): (U, (Async) ?=> Unit) =
val res = self.allocated
try
val mapped = fn(res._1).allocated
(
mapped._1,
{ closeAsync ?=>
try mapped._2(using closeAsync) // close inner first
finally res._2(using closeAsync) // then close second, even if first failed
}
)
catch
e =>
res._2
throw e
end Resource

object Resource:
natsukagami marked this conversation as resolved.
Show resolved Hide resolved
/** Create a Resource from the allocation and release operation. The returned resource will allocate a new instance,
* i.e., call [[alloc]], for every call to [[use]] and [[allocated]].
*
* @param alloc
* the allocation (generating) operation
* @param close
* the release (close) operation
* @return
* a new Resource exposing the allocatable object in a safe way
*/
inline def apply[T](inline alloc: Async ?=> T, inline close: T => Async ?=> Unit): Resource[T] =
new Resource[T]:
def allocated(using Async): (T, (Async) ?=> Unit) =
val res = alloc
(res, close(res))

/** Create a concurrent computation resource from an allocator function. It can use the given capability to spawn
* [[Future]]s and return a handle to communicate with them. Allocation is only complete after that allocator
* returns. The resource is only allocated on use.
*
* If the [[Async.Spawn]] capability is used for [[Async.await]]ing, it may only be done synchronously by the
* spawnBody.
*
* No presumption is made on reusability of the Resource. Thus, if the [[spawnBody]] is re-runnable, so is the
* Resource created from it.
*
* @param spawnBody
* the allocator to setup and start asynchronous computation
* @return
* a new resource wrapping access to the spawnBody's results
*/
inline def spawning[T](inline spawnBody: Async.Spawn ?=> T) = Async.spawning.map(spawn => spawnBody(using spawn))

/** Create a resource that does not need asynchronous allocation nor cleanup.
*
* @param data
* the generator that provides the resource element
* @return
* a resource wrapping the data provider
*/
inline def just[T](inline data: => T) = apply(data, _ => ())

/** Create a resource combining access to two separate resources.
*
* @param res1
* the first resource
* @param res2
* the second resource
* @param join
* an operator to combine the elements from both resources to that of the combined resource
* @return
* a new resource wrapping access to the combined element
*/
def both[T, U, V](res1: Resource[T], res2: Resource[U])(join: (T, U) => V): Resource[V] = new Resource[V]:
override def allocated(using Async): (V, (Async) ?=> Unit) =
val alloc1 = res1.allocated
val alloc2 =
try res2.allocated
catch
e =>
alloc1._2
throw e

try
val joined = join(alloc1._1, alloc2._1)
(
joined,
{ closeAsync ?=>
try alloc1._2(using closeAsync)
finally alloc2._2(using closeAsync)
}
)
catch
e =>
try alloc1._2
finally alloc2._2
throw e
end both

/** Create a resource combining access to a list of resources
*
* @param ress
* the list of single resources
* @return
* the resource of the list of elements provided by the single resources
*/
def all[T](ress: List[Resource[T]]): Resource[List[T]] = ress match
case Nil => just(Nil)
case head :: Nil => head.map(List(_))
case head :: next => both(head, all(next))(_ :: _)
end Resource
73 changes: 73 additions & 0 deletions shared/src/main/scala/async/Semaphore.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package gears.async

import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger

/** A semaphore that manages a number of grants. One can wait to obtain a grant (with [[acquire]]) and return it to the
* semaphore (with [[release]]).
*
* @param initialValue
* the initial counter of this semaphore
*/
class Semaphore(initialValue: Int) extends Async.Source[Semaphore.Guard]:
self =>
private val value = AtomicInteger(initialValue)
private val waiting = ConcurrentLinkedQueue[Listener[Semaphore.Guard]]()

override def onComplete(k: Listener[Semaphore.Guard]): Unit =
if k.acquireLock() then // if k is gone, we are done
if value.getAndDecrement() > 0 then
// we got a ticket
k.complete(guard, this)
else
// no ticket -> add to queue and reset value (was now negative - unless concurrently increased)
k.releaseLock()
waiting.add(k)
guard.release()

override def dropListener(k: Listener[Semaphore.Guard]): Unit = waiting.remove(k)

override def poll(k: Listener[Semaphore.Guard]): Boolean =
if !k.acquireLock() then return true
val success = value.getAndUpdate(i => if i > 0 then i - 1 else i) > 0
if success then k.complete(guard, self) else k.releaseLock()
success

override def poll(): Option[Semaphore.Guard] =
if value.getAndUpdate(i => if i > 0 then i - 1 else i) > 0 then Some(guard) else None

/** Decrease the number of grants available from this semaphore, possibly waiting if none is available.
*
* @param a
* the async capability used for waiting
*/
inline def acquire()(using Async): Semaphore.Guard =
this.awaitResult // do not short-circuit because cancellation should be considered first

private object guard extends Semaphore.Guard:
/** Increase the number of grants available to this semaphore, possibly waking up a waiting [[acquire]].
*/
def release(): Unit =
// if value is < 0, a ticket is missing anyway -> do nothing now
if value.getAndUpdate(i => if i < 0 then i + 1 else i) >= 0 then
// we kept the ticket for now

var listener = waiting.poll()
while listener != null && !listener.completeNow(guard, self) do listener = waiting.poll()
// if listener not null, then we quit because listener was completed -> ticket is reused -> we are done

// if listener is null, return the ticket by incrementing, then recheck waiting queue (if incremented to >0)
if listener == null && value.getAndIncrement() >= 0 then
listener = waiting.poll()
if listener != null then // if null now, we are done
onComplete(listener)

object Semaphore:
/** A guard that marks a single usage of the [[Semaphore]]. Implements [[java.lang.AutoCloseable]] so it can be used
* as a try-with-resource (e.g. with [[scala.util.Using]]).
*/
trait Guard extends java.lang.AutoCloseable:
/** Release the semaphore, must be called exactly once. */
def release(): Unit

final def close() = release()
Loading
Loading