diff --git a/DevelopmentSandbox/DevelopmentSandbox.csproj b/DevelopmentSandbox/DevelopmentSandbox.csproj new file mode 100644 index 0000000..961e278 --- /dev/null +++ b/DevelopmentSandbox/DevelopmentSandbox.csproj @@ -0,0 +1,21 @@ + + + + Exe + net6.0 + enable + enable + + + + + + + + + + + + + + diff --git a/DevelopmentSandbox/Program.cs b/DevelopmentSandbox/Program.cs new file mode 100644 index 0000000..9266a37 --- /dev/null +++ b/DevelopmentSandbox/Program.cs @@ -0,0 +1,115 @@ +using dotnet_etcd; +using Etcdserverpb; +using Google.Protobuf; +using Grpc.Core; +using Integration; +using Serilog; +using Serilog.Sinks.SystemConsole.Themes; + +namespace DevelopmentSandbox; // Note: actual namespace depends on the project name. + + + +internal class Program +{ + private static async Task Main(string[] args) + { + var cts = new CancellationTokenSource(); + AppDomain.CurrentDomain.ProcessExit += (_, _) => { cts.Cancel(); }; + Console.CancelKeyPress += (_, ea) => { cts.Cancel(); }; + ILogger logger = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Console( + theme: SystemConsoleTheme.Literate, + outputTemplate: + "[{Timestamp:HH:mm:ss.fff} {Level:u3}] {Message:lj}{NewLine}{Exception}{Properties:j}{NewLine}") + .Enrich.FromLogContext() + .CreateLogger(); + + var connection_string = Environment.GetEnvironmentVariable("ETCD_CONNECTION_STRING"); + + + var client = new EtcdClient( + connectionString: connection_string, + // handler: handler, + useLegacyRpcExceptionForCancellation: false); + + + var txn = new TxnRequest(); + for (int i = 0; i < 120; i++) + { + txn.Success.Add(new RequestOp() + { + RequestPut = new PutRequest() + { + Key = ByteString.CopyFromUtf8( "asdfadsfasdfdsf"+i), + Value = ByteString.CopyFromUtf8("dfasdvasdfasdf") + + } + }); + } + + while (true) + { + try + { + await client.TransactionAsync( + txn, + deadline: DateTime.UtcNow.AddMilliseconds(10)); + + Console.WriteLine("ok"); + } + catch (Exception e) + { + Console.WriteLine("fail"); + } + } + + + Func doJob = async () => + { + var leaseId = client.LeaseGrant(new LeaseGrantRequest() { TTL = 5 }).ID; + await client.HighlyReliableLeaseKeepAliveAsync( + leaseId, + 5, + retryDurationMs: 1000, + maxRetryBackoffMs: 400, + sleepAfterSuccessMs: 5000 / 3, + cts.Token).ConfigureAwait(false); + // await client.LeaseKeepAlive( + // leaseId, + // CancellationToken.None).ConfigureAwait(false); + }; + + List jobs = new List(1000); + + foreach (var i in Enumerable.Range( + 0, + 20000)) + { + + await Task.Delay(5); //что бы кипалайвы были в приоритете создания новых тасков + if (cts.Token.IsCancellationRequested) + { + break; + } + + var t = Task.Run( + async () => + { + cts.Token.ThrowIfCancellationRequested(); + Console.WriteLine(i); + try + { + await doJob().ConfigureAwait(false); + } + finally + { + cts.Cancel(); + } + }, + cts.Token); + jobs.Add(t); + } + + await await Task.WhenAny(jobs); + } +} \ No newline at end of file diff --git a/DevelopmentSandbox/Properties/launchSettings.json b/DevelopmentSandbox/Properties/launchSettings.json new file mode 100644 index 0000000..dcf8593 --- /dev/null +++ b/DevelopmentSandbox/Properties/launchSettings.json @@ -0,0 +1,11 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "DevelopmentSandbox": { + "commandName": "Project", + "environmentVariables": { + "ETCD_CONNECTION_STRING": "http://127.0.0.1:23790,http://127.0.0.1:23791,http://127.0.0.1:23792" + } + } + } +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..fc92529 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,54 @@ +version: '3.5' + +services: + etcd0: + image: "gcr.io/etcd-development/etcd:v3.5.4" + ports: + - "23790:2379" + - "2379:2379" + command: + [ + "etcd", + "--name=etcd0", + "--advertise-client-urls=http://etcd0:2379", + "--listen-client-urls=http://0.0.0.0:2379", + "--initial-advertise-peer-urls=http://etcd0:2380", + "--listen-peer-urls=http://0.0.0.0:2380", + "--initial-cluster-token=etcd-cluster-1", + "--initial-cluster=etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380", + "--initial-cluster-state=new" + ] + + etcd1: + image: "gcr.io/etcd-development/etcd:v3.5.4" + ports: + - "23791:2379" + command: + [ + "etcd", + "--name=etcd1", + "--advertise-client-urls=http://etcd1:2379", + "--listen-client-urls=http://0.0.0.0:2379", + "--initial-advertise-peer-urls=http://etcd1:2380", + "--listen-peer-urls=http://0.0.0.0:2380", + "--initial-cluster-token=etcd-cluster-1", + "--initial-cluster=etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380", + "--initial-cluster-state=new" + ] + + etcd2: + image: "gcr.io/etcd-development/etcd:v3.5.4" + ports: + - "23792:2379" + command: + [ + "etcd", + "--name=etcd2", + "--advertise-client-urls=http://etcd2:2379", + "--listen-client-urls=http://0.0.0.0:2379", + "--initial-advertise-peer-urls=http://etcd2:2380", + "--listen-peer-urls=http://0.0.0.0:2380", + "--initial-cluster-token=etcd-cluster-1", + "--initial-cluster=etcd0=http://etcd0:2380,etcd1=http://etcd1:2380,etcd2=http://etcd2:2380", + "--initial-cluster-state=new" + ] diff --git a/dotnet-etcd.sln b/dotnet-etcd.sln index d9eb707..223fb8a 100644 --- a/dotnet-etcd.sln +++ b/dotnet-etcd.sln @@ -9,6 +9,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{8645A28E EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Integration", "tests\Integration\Integration.csproj", "{617202A0-4D3A-4FA7-978E-B5A01A3152BD}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DevelopmentSandbox", "DevelopmentSandbox\DevelopmentSandbox.csproj", "{41EEF9B2-F4CE-4C28-A2C1-1525AD32729C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -23,6 +25,10 @@ Global {617202A0-4D3A-4FA7-978E-B5A01A3152BD}.Debug|Any CPU.Build.0 = Debug|Any CPU {617202A0-4D3A-4FA7-978E-B5A01A3152BD}.Release|Any CPU.ActiveCfg = Release|Any CPU {617202A0-4D3A-4FA7-978E-B5A01A3152BD}.Release|Any CPU.Build.0 = Release|Any CPU + {41EEF9B2-F4CE-4C28-A2C1-1525AD32729C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {41EEF9B2-F4CE-4C28-A2C1-1525AD32729C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {41EEF9B2-F4CE-4C28-A2C1-1525AD32729C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {41EEF9B2-F4CE-4C28-A2C1-1525AD32729C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/dotnet-etcd/LeaseExpiredOrNotFoundException.cs b/dotnet-etcd/LeaseExpiredOrNotFoundException.cs new file mode 100644 index 0000000..cf61ff1 --- /dev/null +++ b/dotnet-etcd/LeaseExpiredOrNotFoundException.cs @@ -0,0 +1,42 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Runtime.Serialization; + +namespace dotnet_etcd +{ + [Serializable] + public class LeaseExpiredOrNotFoundException : Exception + { + private readonly long _leaseId; + + public LeaseExpiredOrNotFoundException(long leaseId) : base("LeaseExpiredOrNotFoundException: leaseId=" + leaseId) + { + _leaseId = leaseId; + } + + public LeaseExpiredOrNotFoundException(long leaseId, string message) : base(message) + { + _leaseId = leaseId; + } + + public LeaseExpiredOrNotFoundException(long leaseId, string message, Exception inner) : base( + message, + inner) + { + _leaseId = leaseId; + } + + protected LeaseExpiredOrNotFoundException( + SerializationInfo info, + StreamingContext context) : base( + info, + context) + { + info.AddValue( + name: "leaseId", + value: _leaseId); + } + } +} diff --git a/dotnet-etcd/dotnet-etcd.csproj b/dotnet-etcd/dotnet-etcd.csproj index ae50c36..601bc7f 100644 --- a/dotnet-etcd/dotnet-etcd.csproj +++ b/dotnet-etcd/dotnet-etcd.csproj @@ -66,6 +66,9 @@ Advanced uses take advantage of the consistency guarantees to implement database all runtime; build; native; contentfiles; analyzers + + + diff --git a/dotnet-etcd/etcdClient.cs b/dotnet-etcd/etcdClient.cs index de14ea7..9871a90 100644 --- a/dotnet-etcd/etcdClient.cs +++ b/dotnet-etcd/etcdClient.cs @@ -31,7 +31,7 @@ public partial class EtcdClient : IDisposable, IEtcdClient #region Initializers public EtcdClient(string connectionString, int port = 2379, - HttpClientHandler handler = null, bool ssl = false, + HttpMessageHandler handler = null, bool ssl = false, bool useLegacyRpcExceptionForCancellation = false, params Interceptor[] interceptors) { if (string.IsNullOrWhiteSpace(connectionString)) diff --git a/dotnet-etcd/leaseClient.cs b/dotnet-etcd/leaseClient.cs index 5e41452..136bf85 100644 --- a/dotnet-etcd/leaseClient.cs +++ b/dotnet-etcd/leaseClient.cs @@ -2,13 +2,23 @@ // The .NET Foundation licenses this file to you under the MIT license. using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; using System.IO; +using System.Linq; +using System.Net.Http.Headers; +using System.Runtime.InteropServices; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; - +using dotnet_etcd.multiplexer; using Etcdserverpb; +using Polly; using Grpc.Core; +using Polly.Contrib.WaitAndRetry; +using Polly.Timeout; namespace dotnet_etcd { @@ -109,6 +119,203 @@ public async Task LeaseKeepAlive(long leaseId, CancellationToken cancellationTok } }).ConfigureAwait(false); + + /// + /// HighlyReliableLeaseKeepAlive keeps lease alive by sending keep alive requests and receiving keep alive responses. + /// Reliability is achieved by sequentially sending keep alive requests at short intervals to all etcd nodes + /// + /// lease identifier + /// the remaining TTL at the time the method was called. used to determine initial deadlines + /// + /// + /// + /// + /// throws the exception if lease not found, expired, revoked or keep alive unsuccessfully + /// is received within the lease TTL or + public async Task HighlyReliableLeaseKeepAliveAsync(long leaseId, long leaseRemainigTTL, + int retryDurationMs, int maxRetryBackoffMs, int sleepAfterSuccessMs, CancellationToken cancellationToken) + { + int startNodeIndex = (new Random()).Next(_balancer._numNodes); + while (true) // keepAlive rounds + { + cancellationToken.ThrowIfCancellationRequested(); + var roundCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + int usedKeepAliveJobs = 0; + int delayBetweenUseNewKeepAliveJob = retryDurationMs / _balancer._numNodes; + startNodeIndex = ++startNodeIndex >= _balancer._numNodes ? 0 : startNodeIndex; + DateTime leaseExpiredAt = DateTime.UtcNow.AddSeconds(leaseRemainigTTL); + List> keepAliveJobs = new List>(); + while (usedKeepAliveJobs < _balancer._numNodes) + { + usedKeepAliveJobs++; + roundCancellationTokenSource.Token.ThrowIfCancellationRequested(); + //todo: вынести работу с выбором ноды в отдельный метод + int currentNodeIndex = startNodeIndex + usedKeepAliveJobs; + currentNodeIndex = currentNodeIndex >= _balancer._numNodes + ? currentNodeIndex - _balancer._numNodes + : currentNodeIndex; + Connection connection = _balancer._healthyNode.ElementAt(currentNodeIndex); + Task keepAliveJob = RetryUntilKeepAliveResponseAsync( + leaseId, + connection, + retryDurationMs, + maxRetryBackoffMs, + leaseExpiredAt, + roundCancellationTokenSource.Token); + keepAliveJobs.Add(keepAliveJob); + + await WhenAnySuccessLimitedAsync( + keepAliveJobs, + waitLimitMs: delayBetweenUseNewKeepAliveJob, + cancellationToken: roundCancellationTokenSource.Token).ConfigureAwait(false); + + if (IsAnyCompletedSuccessfully(keepAliveJobs, out var _)) + { + roundCancellationTokenSource.Cancel(); + break; + } + } + + try + { + //todo: не понятно что дедлайн лизы заложен внутри джоб, подумать как сделать проще + await Task.WhenAll(keepAliveJobs.ToArray()).ConfigureAwait(false); + } + catch (Exception e) + { + // ignored exceptions will handled later + } + + if (IsAnyCompletedSuccessfully( + keepAliveJobs, + out var keepAliveResponse) + && keepAliveResponse.TTL > 0) + { + + await Task.Delay( + sleepAfterSuccessMs, + cancellationToken).ConfigureAwait(false); + leaseRemainigTTL = Math.Max(0,keepAliveResponse.TTL - sleepAfterSuccessMs / 1000); + continue; //go to next round + } + //lease not found, expired or revoked or keep alive unsuccessfully + List exceptions = new List() + { + new LeaseExpiredOrNotFoundException(leaseId), + }; + exceptions.AddRange(keepAliveJobs + .SelectMany(job => job.Exception?.InnerExceptions) + .Where(exception=>exception != null)); // collect all exceptions + throw new AggregateException(exceptions); + } + + async Task WhenAnySuccessLimitedAsync(IEnumerable tasks,int waitLimitMs, CancellationToken cancellationToken) + { + List runningTasks = tasks?.ToList() ?? new List(); + Task waitLimit = Task.Delay( + waitLimitMs, + cancellationToken); + while (runningTasks.Count > 0) + { + cancellationToken.ThrowIfCancellationRequested(); + Task completedTask = await Task.WhenAny(runningTasks.Append(waitLimit)).ConfigureAwait(false); + if (completedTask.IsCompletedSuccessfully || completedTask == waitLimit) + { + return; + } + runningTasks.Remove(completedTask); + } + } + + + + async Task RetryUntilKeepAliveResponseAsync(long leaseId, Connection connection, + int retryDurationMs, int maxRetryBackoffMs, DateTime deadline, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + // timeoutPolicy thrown own exception, that overlap retry exceptions, + // so this list used for catch the retry exceptions. + List retryExceptions = new List(); + var timeoutPolicy = Policy.TimeoutAsync(deadline - DateTime.UtcNow); + + TimeSpan maxRetryBackoff = TimeSpan.FromMilliseconds(maxRetryBackoffMs); + var retryDelay = + Backoff.DecorrelatedJitterBackoffV2( + fastFirst: true, + medianFirstRetryDelay: TimeSpan.FromMilliseconds(100), //todo: вынести в параметры + retryCount: int.MaxValue) + .Select(s => s < maxRetryBackoff ? s : maxRetryBackoff); + var retryPolicy = Policy + //retry on all exceptions except LeaseExpiredOrNotFoundException + .Handle() + .WaitAndRetryAsync( + retryDelay, + onRetry: (exception, _) => retryExceptions.Add(exception)); + var retryTimeoutPolicy = Policy.TimeoutAsync(TimeSpan.FromMilliseconds(retryDurationMs)); + var policy = + Policy.WrapAsync( + timeoutPolicy, + retryPolicy, + retryTimeoutPolicy); + try + { + LeaseKeepAliveResponse response = await policy.ExecuteAsync( + continueOnCapturedContext: false, + cancellationToken: cancellationToken, + action: async retryCancellationToken => + { + retryCancellationToken.ThrowIfCancellationRequested(); + using AsyncDuplexStreamingCall leaser = + connection._leaseClient + .LeaseKeepAlive(cancellationToken: retryCancellationToken); + // ReSharper disable once MethodSupportsCancellation //method doesn't support cancellation + await leaser.RequestStream.WriteAsync( + new LeaseKeepAliveRequest() { ID = leaseId }); + bool result = await leaser.ResponseStream.MoveNext(retryCancellationToken) + .ConfigureAwait(false); + if (!result) + { + throw new RpcException( + new Status( + StatusCode.Aborted, + "didnt receive keepAlive response")); + } + + await leaser.RequestStream.CompleteAsync(); + return leaser.ResponseStream.Current; + }).ConfigureAwait(false); + return response; + } + catch (TimeoutRejectedException e) + { + throw new AggregateException( + retryExceptions + .Append(e) + .Reverse()); + } + } + + bool IsAnyCompletedSuccessfully(IEnumerable> tasks, + out T response) + { + foreach (Task call in tasks) + { + if (call.IsCompletedSuccessfully) + { + response = call.Result; + return true; + } + } + + response = default; + return false; + } + + } + + + + /// /// LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client /// to the server and streaming keep alive responses from the server to the client. @@ -273,4 +480,4 @@ public async Task LeaseTimeToLiveAsync(LeaseTimeToLiveR CancellationToken cancellationToken = default) => await CallEtcdAsync(async (connection) => await connection._leaseClient .LeaseTimeToLiveAsync(request, headers, deadline, cancellationToken)).ConfigureAwait(false); } -} \ No newline at end of file +} diff --git a/dotnet-etcd/multiplexer/Balancer.cs b/dotnet-etcd/multiplexer/Balancer.cs index 7ac998d..aec4303 100644 --- a/dotnet-etcd/multiplexer/Balancer.cs +++ b/dotnet-etcd/multiplexer/Balancer.cs @@ -18,7 +18,7 @@ namespace dotnet_etcd.multiplexer internal class Balancer { - private readonly HashSet _healthyNode; + internal readonly HashSet _healthyNode; /// /// No of etcd nodes @@ -36,7 +36,7 @@ internal class Balancer private static readonly Random s_random = new Random(); - internal Balancer(List nodes, HttpClientHandler handler = null, bool ssl = false, + internal Balancer(List nodes, HttpMessageHandler handler = null, bool ssl = false, bool useLegacyRpcExceptionForCancellation = false, params Interceptor[] interceptors) { _numNodes = nodes.Count; @@ -57,6 +57,7 @@ internal Balancer(List nodes, HttpClientHandler handler = null, bool ssl = ThrowOperationCanceledOnCancellation = !useLegacyRpcExceptionForCancellation }); } + else { #if NETCOREAPP3_1 || NETCOREAPP3_0 diff --git a/tests/Integration/DevelopmentProcessTests.cs b/tests/Integration/DevelopmentProcessTests.cs deleted file mode 100644 index b9fa69f..0000000 --- a/tests/Integration/DevelopmentProcessTests.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System.Threading.Tasks; -using dotnet_etcd; -using DotnetNiceGrpcLogs; -using Etcdserverpb; -using NUnit.Framework; -using Serilog; -using Serilog.Sinks.SystemConsole.Themes; - -namespace Integration; - -public class DevelopmentProcessTests -{ - - - [SetUp] - public async Task Setup() - { - await Framework.CleanEtcdTestsKeys(); - } - - [TearDown] - public async Task TearDownAsync() - { - await Framework.CleanEtcdTestsKeys(); - } - - //debug space here - [Test] - public async Task Test1() - { - } -} \ No newline at end of file diff --git a/tests/Integration/Framework.cs b/tests/Integration/Framework.cs deleted file mode 100644 index d54a6cb..0000000 --- a/tests/Integration/Framework.cs +++ /dev/null @@ -1,35 +0,0 @@ -using System.Threading.Tasks; -using dotnet_etcd; -using DotnetNiceGrpcLogs; -using Serilog; -using Serilog.Sinks.SystemConsole.Themes; - -namespace Integration; - -public static class Framework -{ - public const string TestPrefix = "/Tests/"; - - public static ILogger Logger { get; } = new LoggerConfiguration().MinimumLevel.Verbose().WriteTo.Console( - theme: SystemConsoleTheme.Literate, - outputTemplate: - "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}{Properties:j}{NewLine}") - .Enrich.FromLogContext() - .CreateLogger(); - - public static EtcdClient Client { get; } = new EtcdClient( - "http://localhost:23790,http://localhost:23791,http://localhost:23792", //todo: вытащить в конфигурацию - useLegacyRpcExceptionForCancellation: false, - interceptors: new GrpcLogsInterceptor( - Logger, - new LogsInterceptorOptions - { - //LoggerName = null, - IncludeLogData = true - })); - - public static async Task CleanEtcdTestsKeys() - { - await Client.DeleteRangeAsync(TestPrefix); - } -} \ No newline at end of file diff --git a/tests/Integration/Lease.cs b/tests/Integration/Lease.cs new file mode 100644 index 0000000..82eb293 --- /dev/null +++ b/tests/Integration/Lease.cs @@ -0,0 +1,177 @@ +using System; +using System.Data.Common; +using System.Diagnostics; +using System.Net; +using System.Net.Http; +using System.Security.Authentication; +using System.Threading; +using System.Threading.Tasks; +using dotnet_etcd; +using DotnetNiceGrpcLogs; +using Etcdserverpb; +using Grpc.Core; +using Integration.Utils; +using NUnit.Framework; +using Polly; + +namespace Integration; + +public class Lease +{ + private const string Etcd1 = "127.0.0.1:23790"; + private const string Etcd2 = "127.0.0.1:23791"; + private const string Etcd3 = "127.0.0.1:23792"; + private const string ConnectionString = $"http://{Etcd1},http://{Etcd2},http://{Etcd3}"; + + [SetUp] + public async Task Setup() + { + } + + [TearDown] + public async Task TearDownAsync() + { + } + + + [Test] + public async Task LeaseGranted() + { + var delegateInterceptor = new DelegateInterceptor(); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: delegateInterceptor); + LeaseGrantRequest request = new() { ID = 777, TTL = 777 }; + LeaseGrantResponse response = new() { ID = 888, TTL = 888 }; + var responseTask = Task.Run(() => + { + return client.LeaseGrant(request); + }); + await foreach ((string address, Guid callId, LeaseGrantRequest message, bool _) in delegateInterceptor.ReadAllRequests(CancellationToken.None)) + { + Assert.AreEqual(message, request); + await delegateInterceptor.WriteResponseAsync(address, callId,response); + break; + } + + var rsp = responseTask.Result; + Assert.AreEqual(rsp, response); + } + + [Test] + public async Task LeaseGrantedAsync() + { + var delegateInterceptor = new DelegateInterceptor(); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: delegateInterceptor); + LeaseGrantRequest request = new() { ID = 777, TTL = 777 }; + LeaseGrantResponse response = new() { ID = 888, TTL = 888 }; + var responseTask = Task.Run(() => + { + return client.LeaseGrantAsync(request); + }); + await foreach ((string address, Guid callId, LeaseGrantRequest message, bool _) in delegateInterceptor.ReadAllRequests(CancellationToken.None)) + { + Assert.AreEqual(message, request); + await delegateInterceptor.WriteResponseAsync(address, callId,response); + break; + } + + var rsp = responseTask.Result; + Assert.AreEqual(rsp, response); + } + + [Test] + public async Task AfterExceptionsLeaseGranted() + { + var delegateInterceptor = new DelegateInterceptor(); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: delegateInterceptor); + LeaseGrantRequest request = new() { ID = 777, TTL = 777 }; + LeaseGrantResponse response = new() { ID = 888, TTL = 888 }; + var responseTask = Task.Run(() => + { + return client.LeaseGrantAsync(request); + }); + RpcException unavailableException = new RpcException( + new Status( + StatusCode.Unavailable, + "")); + + var iterator = delegateInterceptor.ReadAllRequests(CancellationToken.None).GetAsyncEnumerator(); + await iterator.MoveNextAsync(); + var current = iterator.Current; + await delegateInterceptor.WriteResponseAsync(current.address, current.callId,unavailableException); + await iterator.MoveNextAsync(); + current = iterator.Current; + await delegateInterceptor.WriteResponseAsync(current.address, current.callId,unavailableException); + await iterator.MoveNextAsync(); + current = iterator.Current; + await delegateInterceptor.WriteResponseAsync(current.address, current.callId,response); + var rsp = responseTask.Result; + + Assert.AreEqual(rsp, response); + } + + [Test] + public async Task AfterThreeExceptionsLeaseGrantedFail() + { + var delegateInterceptor = new DelegateInterceptor(ignoreCallId: true); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: + delegateInterceptor); + + RpcException unavailableException = new RpcException( + new Status( + StatusCode.Unavailable, + "")); + await delegateInterceptor.WriteResponseAsync(Etcd1, Guid.Empty, unavailableException); + await delegateInterceptor.WriteResponseAsync(Etcd2, Guid.Empty,unavailableException); + await delegateInterceptor.WriteResponseAsync(Etcd3, Guid.Empty,unavailableException); + LeaseGrantRequest request = new() { ID = 777, TTL = 777 }; + + var ex = Assert.Throws( + () => + { + try + { + client.LeaseGrantAsync(request).Wait(); + } + catch (AggregateException e) + { + + throw e.InnerException; + } + }); + Assert.That(ex.Status.StatusCode, Is.EqualTo(StatusCode.Unavailable)); + } + + [Test] + public async Task LeaseKeepAliveRequestSendedAfterDelay() + { + var delegateInterceptor = new DelegateInterceptor(true); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: + delegateInterceptor); + var responseTask = Task.Run( + () => + { + return client.LeaseKeepAlive( + 777, + CancellationToken.None); + }); + var iterator = delegateInterceptor.ReadAllRequests(CancellationToken.None).GetAsyncEnumerator(); + await iterator.MoveNextAsync(); + Assert.AreEqual(iterator.Current.message.ID, 777); + await delegateInterceptor.WriteResponseAsync(iterator.Current.address, Guid.Empty,new LeaseKeepAliveResponse() { ID = 777, TTL = 1 }); + var nextKeepAliveTask = iterator.MoveNextAsync(); + await Task.Delay(100); + Assert.True(nextKeepAliveTask.IsCompleted == false); + await Task.Delay(300); + Assert.True(nextKeepAliveTask.Result); + } +} diff --git a/tests/Integration/Maitenance.cs b/tests/Integration/Maitenance.cs new file mode 100644 index 0000000..332eeb0 --- /dev/null +++ b/tests/Integration/Maitenance.cs @@ -0,0 +1,130 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using dotnet_etcd; +using Etcdserverpb; +using Google.Protobuf; +using Grpc.Core; +using Integration.Utils; +using NUnit.Framework; + +namespace Integration; + +public class Maitenance +{ + private const string Etcd1 = "127.0.0.1:23790"; + private const string Etcd2 = "127.0.0.1:23791"; + private const string Etcd3 = "127.0.0.1:23792"; + private const string ConnectionString = $"http://{Etcd1},http://{Etcd2},http://{Etcd3}"; + + [Test] + public async Task SnapshotTransferredConsistently() + { + var delegateInterceptor = new DelegateInterceptor(); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: delegateInterceptor); + List originalSnapshot = new() + { + new SnapshotResponse + { + RemainingBytes = 30, + Blob = ByteString.CopyFromUtf8("part1") + }, + new SnapshotResponse + { + RemainingBytes = 20, + Blob = ByteString.CopyFromUtf8("part2") + }, + new SnapshotResponse + { + RemainingBytes = 10, + Blob = ByteString.CopyFromUtf8("part3") + }, + }; + + SnapshotRequest request = new SnapshotRequest(); + List receivedSnapshot = new(); + var callTask = Task.Run( + () => client.Snapshot( + request, + rsp => + { + receivedSnapshot.Add(rsp); + }, + CancellationToken.None)); + await foreach (var (address, callId, message, _) in delegateInterceptor.ReadAllRequests(CancellationToken.None)) + { + foreach (var snapshotPart in originalSnapshot) + { + await delegateInterceptor.WriteResponseAsync(address, callId, snapshotPart); + } + + await delegateInterceptor.CloseResponseStreamAsync( + address, + callId); + break; + } + await callTask; + CollectionAssert.AreEqual(receivedSnapshot, originalSnapshot); + } + + [Test] + public async Task SnapshotTransferredConsistentlyThrowExceptions() + { + var delegateInterceptor = new DelegateInterceptor(ignoreCallId: true); + var client = new EtcdClient( + connectionString: ConnectionString, + interceptors: delegateInterceptor); + + var snapshotPart1 = new SnapshotResponse + { + RemainingBytes = 30, + Blob = ByteString.CopyFromUtf8("part1") + }; + var snapshotPart2 = new SnapshotResponse + { + RemainingBytes = 20, + Blob = ByteString.CopyFromUtf8("part2") + }; + var snapshotPart3 = new SnapshotResponse + { + RemainingBytes = 10, + Blob = ByteString.CopyFromUtf8("part3") + }; + List originalSnapshot = new() + { + snapshotPart1, + snapshotPart2, + snapshotPart3, + }; + + RpcException unavailableException = new RpcException(new Status(StatusCode.Unavailable, "")); + await delegateInterceptor.WriteResponseAsync(Etcd1, Guid.Empty, snapshotPart1); + await delegateInterceptor.WriteResponseAsync(Etcd1, Guid.Empty, unavailableException); + await delegateInterceptor.CloseResponseStreamAsync(Etcd1, Guid.Empty); + await delegateInterceptor.WriteResponseAsync(Etcd2, Guid.Empty, snapshotPart1); + await delegateInterceptor.WriteResponseAsync(Etcd2, Guid.Empty, snapshotPart2); + await delegateInterceptor.WriteResponseAsync(Etcd2, Guid.Empty, unavailableException); + await delegateInterceptor.CloseResponseStreamAsync(Etcd2, Guid.Empty); + await delegateInterceptor.WriteResponseAsync(Etcd3, Guid.Empty, snapshotPart1); + await delegateInterceptor.WriteResponseAsync(Etcd3, Guid.Empty, snapshotPart2); + await delegateInterceptor.WriteResponseAsync(Etcd3, Guid.Empty, snapshotPart3); + await delegateInterceptor.CloseResponseStreamAsync(Etcd3, Guid.Empty); + + + SnapshotRequest request = new SnapshotRequest(); + List receivedSnapshot = new(); + await client.Snapshot( + request, + rsp => + { + receivedSnapshot.Add(rsp); + }, + CancellationToken.None); + + CollectionAssert.AreEqual(receivedSnapshot, originalSnapshot); + // test failed because current stream retry didnt correct + } +} \ No newline at end of file diff --git a/tests/Integration/Utils/DelegateInterceptor.cs b/tests/Integration/Utils/DelegateInterceptor.cs new file mode 100644 index 0000000..a698013 --- /dev/null +++ b/tests/Integration/Utils/DelegateInterceptor.cs @@ -0,0 +1,186 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Interceptors; +using Grpc.Net.Client; + +namespace Integration.Utils; + +internal class DelegateInterceptor : Interceptor + where TReq : class + where TRsp : class +{ + private readonly MessageStore _requestsStore = new(); + private readonly MessageStore _responsesStore = new(); + private readonly bool _ignoreCallId = false; + + public DelegateInterceptor(bool ignoreCallId = false) + { + _ignoreCallId = ignoreCallId; + } + public override AsyncUnaryCall AsyncUnaryCall(TRequest request, + ClientInterceptorContext context, + AsyncUnaryCallContinuation continuation) + { + ValidateCall(); + Guid callId = _ignoreCallId? Guid.Empty : Guid.NewGuid(); + string address = GetEtcdAdders(continuation); + _requestsStore.WriteAsync( + address, callId, + request).Wait(); + _requestsStore.Complete(address, callId); + var enumerator = _responsesStore.GetReader(address,callId); + if (!enumerator.MoveNext().Result) throw new Exception("response required"); + var response = enumerator.Current; + _responsesStore.Complete(address,callId); + var call = new AsyncUnaryCall( + responseAsync: Task.FromResult(response), + responseHeadersAsync: Task.FromResult(new Metadata()), + getStatusFunc: () => new Status( + statusCode: StatusCode.OK, + detail: ""), + getTrailersFunc: () => new Metadata(), + disposeAction: () => { }); + return call; + } + + public override TResponse BlockingUnaryCall(TRequest request, + ClientInterceptorContext context, + BlockingUnaryCallContinuation continuation) + { + ValidateCall(); + Guid callId = _ignoreCallId? Guid.Empty : Guid.NewGuid(); + string address = GetEtcdAdders(continuation); + _requestsStore.WriteAsync( + address,callId, + request).Wait(); + _requestsStore.Complete(address, callId); + var enumerator = _responsesStore.GetReader(address,callId); + if (!enumerator.MoveNext().Result) throw new Exception("response required"); + _responsesStore.Complete(address,callId); + return enumerator.Current; + } + + public override AsyncClientStreamingCall AsyncClientStreamingCall( + ClientInterceptorContext context, + AsyncClientStreamingCallContinuation continuation) + { + ValidateCall(); + Guid callId = _ignoreCallId? Guid.Empty : Guid.NewGuid(); + string address = GetEtcdAdders(continuation); + var reader = _responsesStore.GetReader(address,callId); + var call = new AsyncClientStreamingCall( + requestStream: _requestsStore.GetWriter(address, callId), + responseHeadersAsync: Task.FromResult(new Metadata()), + getStatusFunc: () => new Status( + statusCode: StatusCode.OK, + detail: ""), + getTrailersFunc: () => new Metadata(), + disposeAction: () => { }, + responseAsync: Task.Run( + async () => + { + await reader.MoveNext(); + _responsesStore.Complete(address,callId); + return reader.Current; + })); + return call; + } + + + public override AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, + ClientInterceptorContext context, + AsyncServerStreamingCallContinuation continuation) + { + ValidateCall(); + Guid callId = _ignoreCallId? Guid.Empty : Guid.NewGuid(); + string address = GetEtcdAdders(continuation); + _requestsStore.WriteAsync( + address,callId, + request).Wait(); + _requestsStore.Complete(address, callId); + var call = new AsyncServerStreamingCall( + responseStream: _responsesStore.GetReader(address,callId), + responseHeadersAsync: Task.FromResult(new Metadata()), + getStatusFunc: () => new Status( + statusCode: StatusCode.OK, + detail: ""), + getTrailersFunc: () => new Metadata(), + disposeAction: () => { }); + return call; + } + + + public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall( + ClientInterceptorContext context, + AsyncDuplexStreamingCallContinuation continuation) + { + ValidateCall(); + Guid callId = _ignoreCallId? Guid.Empty : Guid.NewGuid(); + string address = GetEtcdAdders(continuation); + AsyncDuplexStreamingCall call = new( + requestStream: _requestsStore.GetWriter(address,callId), + responseStream: _responsesStore.GetReader(address,callId), + responseHeadersAsync: Task.FromResult(new Metadata()), + getStatusFunc: () => new Status( + statusCode: StatusCode.OK, + detail: ""), + getTrailersFunc: () => new Metadata(), + disposeAction: () => { }); + return call; + } + + + public async Task WriteResponseAsync(string address,Guid callId, TRsp rsp) + { + await _responsesStore.WriteAsync(address, callId,rsp); + } + + public async Task WriteResponseAsync(string address, Guid callId, Exception exception) + { + await _responsesStore.WriteAsync( + address, callId, + null, + exception); + } + + public async Task CloseResponseStreamAsync(string address, Guid callId) + { + _responsesStore.Complete(address,callId); + } + + public IAsyncEnumerable ReadAllRequests(string address, Guid callId, CancellationToken cancellationToken) + { + return _requestsStore.GetReader(address, callId).ReadAllAsync(cancellationToken); + } + + public async IAsyncEnumerable<(string address, Guid callId, TReq message, bool closed)> ReadAllRequests(CancellationToken cancellationToken) + { + await foreach (var (address, callId, message, exception, closed) in _requestsStore.ReadMessages()) + { + if (exception != null) throw exception; + yield return (address, callId, (TReq)message!, closed); + } + } + + private static void ValidateCall() + { + if (typeof(TReq) != typeof(TRequest) || typeof(TRsp) != typeof(TResponse)) + throw new Exception("Interceptor not applicable to these call"); + } + + private static string GetEtcdAdders(Delegate continuation) + { + object target = continuation.Target!; + object invoker = target.GetType().GetField("invoker",BindingFlags.Instance|BindingFlags.NonPublic)! + .GetValue(target)!; + GrpcChannel channel = (GrpcChannel)invoker.GetType() + .GetProperty( + "Channel", + BindingFlags.Instance | BindingFlags.NonPublic)!.GetValue(invoker)!; + return channel.Target; + } +} \ No newline at end of file diff --git a/tests/Integration/Utils/MessageStore.cs b/tests/Integration/Utils/MessageStore.cs new file mode 100644 index 0000000..e490945 --- /dev/null +++ b/tests/Integration/Utils/MessageStore.cs @@ -0,0 +1,162 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Grpc.Core; +using NUnit.Framework; +using Polly; + +namespace Integration.Utils; + +internal class MessageStore +{ + private readonly object _locker = new(); + private readonly List<(string address, Guid callId, object? message, Exception? exception, bool closed)> _store = new(); + private readonly List> _readers = new(); + + + public async Task WriteAsync(string address, Guid callId, object? message, Exception? exception = null) => + await WriteAsync( + address, + callId, + message, + exception, + false); + + private async Task WriteAsync(string address, Guid callId, object? message, Exception? exception, bool closed) + { + await Task.Run( + () => + { + lock (_locker) + { + if (_store.Any(item => item.address == address && item.callId == callId && item.closed)) + throw new InvalidOperationException("call closed already"); + _store.Add((address, callId, message, exception, closed)); + foreach (Channel<(string addres, Guid callId, object? message, Exception? exception, bool closed)> channel in _readers) + { + channel.Writer.TryWrite((address, callId, message, exception, closed)); + } + } + }); + } + + public void Complete(string address, Guid callId) => + WriteAsync( + address, + callId, + null, + null, + true).Wait(); + + public async IAsyncEnumerable<(string address, Guid callId, object? message, Exception? exception, bool closed)> ReadMessages() + { + Channel<(string addres, Guid callId,object? message, Exception? exception, bool closed)>? channel = null; + try + { + lock (_locker) + { + channel = Channel.CreateUnbounded<(string addres, Guid callId, object? message, Exception? exception, bool closed)>(); + _readers.Add(channel); + List<(string address, Guid callId, object? message, Exception? exception, bool closed)> existingMessages = new(_store); + foreach ((string address, Guid callId, object? message, Exception? exception, bool closed) in existingMessages) + { + channel.Writer.TryWrite((address, callId, message, exception, closed)); + } + } + + await foreach ((string addres, Guid callId, object? message, Exception? exception, bool closed) in channel.Reader.ReadAllAsync()) + { + yield return (addres, callId, message, exception, closed); + } + } + finally + { + lock (_locker) + { + if (channel != null) + { + _readers.Remove(channel); + } + } + } + } + + public async IAsyncEnumerable<(object? message, Exception? exception)> ReadMessages(string address, Guid callId) + { + await foreach (var (addr, _callId, message, exception, closed) in ReadMessages()) + { + if (addr == address && callId == _callId) + { + if(closed) yield break; + yield return (message, exception); + } + } + } + + + + public IClientStreamWriter GetWriter(string address, Guid callId) + { + return new DelegateStreamWriter( + async message => await WriteAsync(address, callId, message, null), + async () => + { + Complete(address, callId); + }); + } + + public IAsyncStreamReader GetReader(string address, Guid callId) + { + return new StreamReader(ReadMessages(address, callId).GetAsyncEnumerator()); + } + + + + private class DelegateStreamWriter : IClientStreamWriter + { + private readonly Func _onWrite; + private readonly Func _onComplete; + + public DelegateStreamWriter(Func onWrite, Func onComplete) + { + _onWrite = onWrite; + _onComplete = onComplete; + } + + + public async Task WriteAsync(T message) + { + await _onWrite(message); + } + + + public WriteOptions? WriteOptions { get; set; } + public async Task CompleteAsync() + { + await _onComplete(); + } + } + + private class StreamReader : IAsyncStreamReader + { + private readonly IAsyncEnumerator<(object? message, Exception? exception)> _enumerator; + private readonly Func _getCurrentFunc; + + public StreamReader(IAsyncEnumerator<(object?, Exception?)> enumerator) + { + _enumerator = enumerator; + _getCurrentFunc = () => _enumerator.Current.exception == null + ? (T)_enumerator.Current.message! + : throw _enumerator.Current.exception; + } + public async Task MoveNext(CancellationToken cancellationToken) + { + return await _enumerator.MoveNextAsync(); + } + + public T Current => _getCurrentFunc(); + } +}