Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strong consistency, distributed, in-memory grain directory #9103

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .azure/pipelines/templates/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ jobs:
inputs:
command: 'test'
testRunTitle: ${{category}} on ${{framework}}
arguments: '--no-build --logger "trx;LogFilePrefix=testresults-${{framework}}-{{category}}" --framework ${{framework}} --configuration "${{parameters.build_configuration}}" --filter Category=${{category}} -- -parallel none -noshadow'
arguments: '--no-build --logger "trx;LogFilePrefix=testresults-${{framework}}-${{category}}" --framework ${{framework}} --configuration "${{parameters.build_configuration}}" --filter Category=${{category}} -- -parallel none -noshadow --blame-crash-dump full --blame-hang-timeout 10m --blame-hang-dump-type full'
publishTestResults: false # Doesn't merge correctly, use the explicit PublishTestResults task instead
- task: PublishTestResults@2
displayName: Publishing test results
Expand Down
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
<PackageVersion Include="KubernetesClient" Version="12.1.1" />
<PackageVersion Include="CassandraCSharpDriver" Version="3.20.1" />
<!-- Test related packages -->
<PackageVersion Include="coverlet.collector" Version="6.0.2" />
<PackageVersion Include="FluentAssertions" Version="6.7.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.9.0-preview-23503-02" />
<PackageVersion Include="BenchmarkDotNet" Version="0.13.12" />
Expand Down
8 changes: 8 additions & 0 deletions src/Orleans.Core.Abstractions/Core/IGrainBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,12 @@ public enum DeactivationReasonCode : byte
/// </summary>
Migrating,
}

internal static class DeactivationReasonCodeExtensions
{
public static bool IsTransientError(this DeactivationReasonCode reasonCode)
{
return reasonCode is DeactivationReasonCode.DirectoryFailure;
}
}
}
6 changes: 5 additions & 1 deletion src/Orleans.Core.Abstractions/IDs/ClientGrainId.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ namespace Orleans.Runtime
/// <summary>
/// Creates a new <see cref="ClientGrainId"/> instance.
/// </summary>
public static ClientGrainId Create(string id) => Create(IdSpan.Create(id));
public static ClientGrainId Create(string id)
{
ArgumentNullException.ThrowIfNullOrWhiteSpace(id);
return Create(IdSpan.Create(id));
}

/// <summary>
/// Creates a new <see cref="ClientGrainId"/> instance.
Expand Down
21 changes: 15 additions & 6 deletions src/Orleans.Core.Abstractions/IDs/GrainAddress.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Text.Json.Serialization;
using Orleans.GrainDirectory;

Expand Down Expand Up @@ -46,8 +48,9 @@ public sealed class GrainAddress : IEquatable<GrainAddress>, ISpanFormattable

public bool Equals(GrainAddress? other)
{
return other != null && (SiloAddress?.Equals(other.SiloAddress) ?? other.SiloAddress is null)
&& _grainId.Equals(other._grainId) && _activationId.Equals(other._activationId) && MembershipVersion == other.MembershipVersion;
if (ReferenceEquals(this, other)) return true;
return MatchesGrainIdAndSilo(this, other)
&& _activationId.Equals(other._activationId);
}

/// <summary>
Expand All @@ -56,15 +59,21 @@ public bool Equals(GrainAddress? other)
/// </summary>
/// <param name="other"> The other <see cref="GrainAddress"/> to compare this one with.</param>
/// <returns> Returns <c>true</c> if the two <see cref="GrainAddress"/> are considered to match.</returns>
public bool Matches(GrainAddress other)
public bool Matches(GrainAddress? other)
{
return other is not null && _grainId.Equals(other._grainId) && (SiloAddress?.Equals(other.SiloAddress) ?? other.SiloAddress is null)
if (ReferenceEquals(this, other)) return true;
return MatchesGrainIdAndSilo(this, other)
&& (_activationId.IsDefault || other._activationId.IsDefault || _activationId.Equals(other._activationId));
}

internal static bool MatchesGrainIdAndSilo(GrainAddress address, GrainAddress other)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static bool MatchesGrainIdAndSilo([NotNullWhen(true)] GrainAddress? address, [NotNullWhen(true)] GrainAddress? other)
{
return other is not null && address.GrainId.Equals(other.GrainId) && (address.SiloAddress?.Equals(other.SiloAddress) ?? other.SiloAddress is null);
return other is not null
&& address is not null
&& address.GrainId.Equals(other.GrainId)
&& !(address.SiloAddress is null ^ other.SiloAddress is null)
&& (address.SiloAddress is null || address.SiloAddress.Equals(other.SiloAddress));
}

public override int GetHashCode() => HashCode.Combine(SiloAddress, _grainId, _activationId);
Expand Down
6 changes: 5 additions & 1 deletion src/Orleans.Core.Abstractions/IDs/GrainId.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ private GrainId(SerializationInfo info, StreamingContext context)
/// <summary>
/// Creates a new <see cref="GrainType"/> instance.
/// </summary>
public static GrainId Create(GrainType type, string key) => new GrainId(type, IdSpan.Create(key));
public static GrainId Create(GrainType type, string key)
{
ArgumentNullException.ThrowIfNullOrWhiteSpace(key);
return new GrainId(type, IdSpan.Create(key));
}

/// <summary>
/// Creates a new <see cref="GrainType"/> instance.
Expand Down
6 changes: 5 additions & 1 deletion src/Orleans.Core.Abstractions/IDs/GrainInterfaceType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ namespace Orleans.Runtime
/// <summary>
/// Creates a <see cref="GrainInterfaceType"/> instance.
/// </summary>
public GrainInterfaceType(string value) => _value = IdSpan.Create(value);
public GrainInterfaceType(string value)
{
ArgumentNullException.ThrowIfNullOrWhiteSpace(value);
_value = IdSpan.Create(value);
}

/// <summary>
/// Creates a <see cref="GrainInterfaceType"/> instance.
Expand Down
6 changes: 5 additions & 1 deletion src/Orleans.Core.Abstractions/IDs/GrainType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ private GrainType(SerializationInfo info, StreamingContext context)
/// <returns>
/// The newly created <see cref="GrainType"/> instance.
/// </returns>
public static GrainType Create(string value) => new GrainType(Encoding.UTF8.GetBytes(value));
public static GrainType Create(string value)
{
ArgumentNullException.ThrowIfNullOrWhiteSpace(value);
return new GrainType(Encoding.UTF8.GetBytes(value));
}

/// <summary>
/// Converts a <see cref="GrainType"/> to a <see cref="IdSpan"/>.
Expand Down
7 changes: 1 addition & 6 deletions src/Orleans.Core.Abstractions/IDs/IdSpan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,7 @@ private IdSpan(SerializationInfo info, StreamingContext context)
/// <returns>
/// A new <see cref="IdSpan"/> corresponding to the provided id.
/// </returns>
/// <exception cref="ArgumentException"/>
public static IdSpan Create(string id)
{
ArgumentException.ThrowIfNullOrWhiteSpace(id, nameof(id));
return new IdSpan(Encoding.UTF8.GetBytes(id));
}
public static IdSpan Create(string id) => new(Encoding.UTF8.GetBytes(id));

/// <summary>
/// Returns a span representation of this instance.
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Core.Abstractions/Runtime/MembershipVersion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public MembershipVersion(long version)
public override int GetHashCode() => this.Value.GetHashCode();

/// <inheritdoc/>
public override string ToString() => this.Value.ToString();
public override string ToString() => Value != MinValue.Value ? $"{Value}" : "default";

/// <summary>
/// Compares the provided operands for equality.
Expand Down
2 changes: 2 additions & 0 deletions src/Orleans.Core/Core/GrainFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string grainCl
public TGrainInterface GetGrain<TGrainInterface>(string primaryKey, string grainClassNamePrefix = null)
where TGrainInterface : IGrainWithStringKey
{
ArgumentNullException.ThrowIfNullOrWhiteSpace(primaryKey);
var grainKey = IdSpan.Create(primaryKey);
return (TGrainInterface)GetGrain(typeof(TGrainInterface), grainKey, grainClassNamePrefix: grainClassNamePrefix);
}
Expand Down Expand Up @@ -164,6 +165,7 @@ public IGrain GetGrain(Type grainInterfaceType, long key)
/// <inheritdoc />
public IGrain GetGrain(Type grainInterfaceType, string key)
{
ArgumentNullException.ThrowIfNullOrWhiteSpace(key);
var grainKey = IdSpan.Create(key);
return (IGrain)GetGrain(grainInterfaceType, grainKey, grainClassNamePrefix: null);
}
Expand Down
1 change: 1 addition & 0 deletions src/Orleans.Core/Networking/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ protected Connection(
this.LocalEndPoint = NormalizeEndpoint(this.Context.LocalEndPoint);
}

public ConnectionCommon Shared => shared;
public string ConnectionId => this.Context?.ConnectionId;
public virtual EndPoint RemoteEndPoint { get; }
public virtual EndPoint LocalEndPoint { get; }
Expand Down
4 changes: 4 additions & 0 deletions src/Orleans.Core/Runtime/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ internal static class Constants
public static readonly GrainType ManifestProviderType = SystemTargetGrainId.CreateGrainType("manifest");
public static readonly GrainType ActivationMigratorType = SystemTargetGrainId.CreateGrainType("migrator");
public static readonly GrainType ActivationRepartitionerType = SystemTargetGrainId.CreateGrainType("repartitioner");
public static readonly GrainType DirectoryReplicaType = SystemTargetGrainId.CreateGrainType("dir.replica");
public static readonly GrainType DirectoryReplicaClientType = SystemTargetGrainId.CreateGrainType("dir.replica.client");

public static readonly GrainId SiloDirectConnectionId = GrainId.Create(
GrainType.Create(GrainTypePrefix.SystemPrefix + "silo"),
Expand Down Expand Up @@ -53,6 +55,8 @@ internal static class Constants
{ManifestProviderType, "ManifestProvider"},
{ActivationMigratorType, "ActivationMigrator"},
{ActivationRepartitionerType, "ActivationRepartitioner"},
{DirectoryReplicaType, "DirectoryReplica"},
{DirectoryReplicaClientType, "DirectoryClient"},
}.ToFrozenDictionary();

public static string SystemTargetName(GrainType id) => SingletonSystemTargetNames.TryGetValue(id, out var name) ? name : id.ToString();
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Core/Runtime/IRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ internal interface IRuntimeClient

IGrainReferenceRuntime GrainReferenceRuntime { get; }

void BreakOutstandingMessagesToDeadSilo(SiloAddress deadSilo);
void BreakOutstandingMessagesToSilo(SiloAddress deadSilo);

// For testing purposes only.
int GetRunningRequestsCount(GrainInterfaceType grainInterfaceType);
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ public void Dispose()
disposed = true;
}

public void BreakOutstandingMessagesToDeadSilo(SiloAddress deadSilo)
public void BreakOutstandingMessagesToSilo(SiloAddress deadSilo)
{
foreach (var callback in callbacks)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Orleans.Core/Serialization/OrleansJsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ public override object ReadJson(JsonReader reader, Type objectType, object exist
JObject jo = JObject.Load(reader);
var id = jo["Id"];
GrainId grainId = GrainId.Create(id["Type"].ToObject<string>(), id["Key"].ToObject<string>());
var iface = GrainInterfaceType.Create(jo["Interface"].ToString());
var encodedInterface = jo["Interface"].ToString();
var iface = string.IsNullOrWhiteSpace(encodedInterface) ? default : GrainInterfaceType.Create(encodedInterface);
return this.referenceActivator.CreateReference(grainId, iface);
}
}
Expand Down
44 changes: 16 additions & 28 deletions src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
Expand Down Expand Up @@ -807,7 +806,7 @@ public async ValueTask DisposeAsync()
CancelPendingOperations();
lock (this)
{
State = ActivationState.Invalid;
SetState(ActivationState.Invalid);
}

DisposeTimers();
Expand Down Expand Up @@ -1177,14 +1176,11 @@ async Task ProcessOperationsAsync()
await ActivateAsync(command.RequestContext, command.CancellationToken).SuppressThrowing();
break;
case Command.Deactivate command:
await FinishDeactivating(command.CancellationToken, command.PreviousState).SuppressThrowing();
await FinishDeactivating(command.PreviousState, command.CancellationToken).SuppressThrowing();
break;
case Command.Delay command:
await Task.Delay(command.Duration, GrainRuntime.TimeProvider, command.CancellationToken);
break;
case Command.UnregisterFromCatalog:
UnregisterMessageTarget();
break;
default:
throw new NotSupportedException($"Encountered unknown operation of type {op?.GetType().ToString() ?? "null"} {op}.");
}
Expand Down Expand Up @@ -1462,6 +1458,15 @@ private void RerouteAllQueuedMessages()
return;
}

// If deactivation was caused by a transient failure, allow messages to be forwarded.
if (DeactivationReason.ReasonCode.IsTransientError())
{
foreach (var msg in msgs)
{
msg.ForwardCount = Math.Max(msg.ForwardCount - 1, 0);
}
}

if (_shared.Logger.IsEnabled(LogLevel.Debug))
{
if (ForwardingAddress is { } address)
Expand Down Expand Up @@ -1497,7 +1502,6 @@ private async Task ActivateAsync(Dictionary<string, object>? requestContextData,
{
// A chain of promises that will have to complete in order to complete the activation
// Register with the grain directory and call the Activate method on the new activation.
var stopwatch = ValueStopwatch.StartNew();
try
{
// Currently, the only grain type that is not registered in the Grain Directory is StatelessWorker.
Expand Down Expand Up @@ -1637,8 +1641,8 @@ private async Task ActivateAsync(Dictionary<string, object>? requestContextData,
}
}

_shared.InternalRuntime.ActivationWorkingSet.OnActivated(this);
_shared.InternalRuntime.ActivationWorkingSet.OnActivated(this);

if (_shared.Logger.IsEnabled(LogLevel.Debug))
{
_shared.Logger.LogDebug((int)ErrorCode.Catalog_AfterCallingActivate, "Finished activating grain {Grain}", this);
Expand Down Expand Up @@ -1676,11 +1680,6 @@ private async Task ActivateAsync(Dictionary<string, object>? requestContextData,
}
finally
{
if (cancellationToken.IsCancellationRequested && stopwatch.Elapsed.TotalMilliseconds > 50)
{
_shared.Logger.LogInformation("Cancellation requested for activation {Activation} took {ElapsedMilliseconds:0.0}ms.", this, stopwatch.Elapsed.TotalMilliseconds);
}

_workSignal.Signal();
}
}
Expand All @@ -1691,10 +1690,8 @@ private async Task ActivateAsync(Dictionary<string, object>? requestContextData,
/// <summary>
/// Completes the deactivation process.
/// </summary>
/// <param name="cancellationToken">A cancellation which terminates graceful deactivation when cancelled.</param>
private async Task FinishDeactivating(CancellationToken cancellationToken, ActivationState previousState)
private async Task FinishDeactivating(ActivationState previousState, CancellationToken cancellationToken)
{
var stopwatch = ValueStopwatch.StartNew();
var migrated = false;
var encounteredError = false;
try
Expand Down Expand Up @@ -1832,6 +1829,8 @@ private async Task FinishDeactivating(CancellationToken cancellationToken, Activ

_shared.InternalRuntime.ActivationWorkingSet.OnDeactivated(this);

UnregisterMessageTarget();

try
{
await DisposeAsync();
Expand All @@ -1841,15 +1840,9 @@ private async Task FinishDeactivating(CancellationToken cancellationToken, Activ
_shared.Logger.LogWarning(exception, "Exception disposing activation '{Activation}'.", this);
}

UnregisterMessageTarget();

// Signal deactivation
GetDeactivationCompletionSource().TrySetResult(true);
_workSignal.Signal();
if (cancellationToken.IsCancellationRequested && stopwatch.Elapsed.TotalMilliseconds > 50)
{
_shared.Logger.LogInformation("Cancellation requested for deactivation {Activation} took {ElapsedMilliseconds:0.0}ms.", this, stopwatch.Elapsed.TotalMilliseconds);
}
}

private TaskCompletionSource<bool> GetDeactivationCompletionSource()
Expand Down Expand Up @@ -2049,11 +2042,6 @@ public sealed class Delay(TimeSpan duration) : Command(new())
{
public TimeSpan Duration { get; } = duration;
}

public sealed class UnregisterFromCatalog() : Command(new())
{
public static readonly UnregisterFromCatalog Instance = new();
}
}

internal class ReentrantRequestTracker : Dictionary<Guid, int>
Expand Down
Loading
Loading