Skip to content

Commit

Permalink
StatelessWorker: pump work item queue consistently (#9064)
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed Jul 11, 2024
1 parent 2af2970 commit 77a187e
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions src/Orleans.Runtime/Catalog/StatelessWorkerGrainContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public Task Deactivated
get
{
var completion = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_workItems.Enqueue(new(WorkItemType.DeactivatedTask, new DeactivatedTaskWorkItemState(completion)));
EnqueueWorkItem(WorkItemType.DeactivatedTask, new DeactivatedTaskWorkItemState(completion));
return completion.Task;
}
}
Expand All @@ -77,23 +77,27 @@ public void Activate(Dictionary<string, object>? requestContext, CancellationTok

public void ReceiveMessage(object message)
{
_workItems.Enqueue(new(WorkItemType.Message, message));
_workSignal.Signal();
EnqueueWorkItem(WorkItemType.Message, message);
}

public void Deactivate(DeactivationReason deactivationReason, CancellationToken cancellationToken)
{
_workItems.Enqueue(new(WorkItemType.Deactivate, new DeactivateWorkItemState(deactivationReason, cancellationToken)));
_workSignal.Signal();
EnqueueWorkItem(WorkItemType.Deactivate, new DeactivateWorkItemState(deactivationReason, cancellationToken));
}

public async ValueTask DisposeAsync()
{
var completion = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_workItems.Enqueue(new(WorkItemType.DisposeAsync, new DisposeAsyncWorkItemState(completion)));
EnqueueWorkItem(WorkItemType.DisposeAsync, new DisposeAsyncWorkItemState(completion));
await completion.Task;
}

private void EnqueueWorkItem(WorkItemType type, object state)
{
_workItems.Enqueue(new(type, state));
_workSignal.Signal();
}

public bool Equals([AllowNull] IGrainContext other) => other is not null && ActivationId.Equals(other.ActivationId);

public TComponent? GetComponent<TComponent>() where TComponent : class => this switch
Expand Down Expand Up @@ -305,8 +309,7 @@ public void OnCreateActivation(IGrainContext grainContext)

public void OnDestroyActivation(IGrainContext grainContext)
{
_workItems.Enqueue((WorkItemType.OnDestroyActivation, grainContext));
_workSignal.Signal();
EnqueueWorkItem(WorkItemType.OnDestroyActivation, grainContext);
if (_workers.Count == 0)
{
_shared.InternalRuntime.Catalog.UnregisterMessageTarget(this);
Expand Down

0 comments on commit 77a187e

Please sign in to comment.