diff --git a/GrpcDotNetNamedPipes.PerfTests/GrpcDotNetNamedPipes.PerfTests.csproj b/GrpcDotNetNamedPipes.PerfTests/GrpcDotNetNamedPipes.PerfTests.csproj index 354a359..0bec2b2 100644 --- a/GrpcDotNetNamedPipes.PerfTests/GrpcDotNetNamedPipes.PerfTests.csproj +++ b/GrpcDotNetNamedPipes.PerfTests/GrpcDotNetNamedPipes.PerfTests.csproj @@ -10,8 +10,11 @@ - - + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/GrpcDotNetNamedPipes.Tests/GrpcDotNetNamedPipes.Tests.csproj b/GrpcDotNetNamedPipes.Tests/GrpcDotNetNamedPipes.Tests.csproj index 4876e8a..e6d16f5 100644 --- a/GrpcDotNetNamedPipes.Tests/GrpcDotNetNamedPipes.Tests.csproj +++ b/GrpcDotNetNamedPipes.Tests/GrpcDotNetNamedPipes.Tests.csproj @@ -9,14 +9,17 @@ - - - - - + + + + + - - + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs b/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs index 812536b..b65e5cc 100644 --- a/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs +++ b/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs @@ -21,9 +21,10 @@ internal class ServerConnectionContext : TransportMessageHandler, IDisposable private readonly ConnectionLogger _logger; private readonly Dictionary> _methodHandlers; private readonly PayloadQueue _payloadQueue; - + private readonly TaskFactory _taskFactory; + public ServerConnectionContext(NamedPipeServerStream pipeStream, ConnectionLogger logger, - Dictionary> methodHandlers) + Dictionary> methodHandlers, TaskFactory taskFactory) { CallContext = new NamedPipeCallContext(this); PipeStream = pipeStream; @@ -32,6 +33,7 @@ public ServerConnectionContext(NamedPipeServerStream pipeStream, ConnectionLogge _methodHandlers = methodHandlers; _payloadQueue = new PayloadQueue(); CancellationTokenSource = new CancellationTokenSource(); + _taskFactory = taskFactory; } public NamedPipeServerStream PipeStream { get; } @@ -58,11 +60,13 @@ public IServerStreamWriter CreateResponseStream(Marshaller return new ResponseStreamWriterImpl(Transport, CancellationToken.None, responseMarshaller, () => IsCompleted); } - + public override void HandleRequestInit(string methodFullName, DateTime? deadline) { Deadline = new Deadline(deadline); - Task.Run(async () => await _methodHandlers[methodFullName](this).ConfigureAwait(false)); + // Note the use of .ConfigureAwait(false) here... + // https://blog.stephencleary.com/2012/07/dont-block-on-async-code.html + _taskFactory.StartNew(async () => await _methodHandlers[methodFullName](this).ConfigureAwait(false)); } public override void HandleHeaders(Metadata headers) => RequestHeaders = headers; diff --git a/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs b/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs index e3806f7..1318d58 100644 --- a/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs +++ b/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs @@ -18,7 +18,7 @@ namespace GrpcDotNetNamedPipes.Internal; internal class ServerStreamPool : IDisposable { - private const int PoolSize = 4; + private readonly int PoolSize = 4; private const int FallbackMin = 100; private const int FallbackMax = 10_000; @@ -37,6 +37,8 @@ public ServerStreamPool(string pipeName, NamedPipeServerOptions options, _options = options; _handleConnection = handleConnection; _invokeError = invokeError; + if (options.ThreadPoolSize > 0) + PoolSize = options.ThreadPoolSize; } private NamedPipeServerStream CreatePipeServer() @@ -161,7 +163,8 @@ private void RunHandleConnection(NamedPipeServerStream pipeServer) try { await _handleConnection(pipeServer); - pipeServer.Disconnect(); + if (pipeServer.IsConnected) + pipeServer.Disconnect(); } catch (Exception error) { diff --git a/GrpcDotNetNamedPipes/NamedPipeServer.cs b/GrpcDotNetNamedPipes/NamedPipeServer.cs index 7a396f7..e83bd05 100644 --- a/GrpcDotNetNamedPipes/NamedPipeServer.cs +++ b/GrpcDotNetNamedPipes/NamedPipeServer.cs @@ -19,6 +19,7 @@ namespace GrpcDotNetNamedPipes; public class NamedPipeServer : IDisposable { private readonly ServerStreamPool _pool; + private readonly TaskFactory _taskFactory; private readonly Action _log; private readonly Dictionary> _methodHandlers = new(); @@ -32,11 +33,12 @@ public NamedPipeServer(string pipeName, NamedPipeServerOptions options) { } - internal NamedPipeServer(string pipeName, NamedPipeServerOptions options, Action log) + public NamedPipeServer(string pipeName, NamedPipeServerOptions options, Action log) { _pool = new ServerStreamPool(pipeName, options, HandleConnection, InvokeError); _log = log; ServiceBinder = new ServiceBinderImpl(this); + _taskFactory = options.TaskFactory ?? new TaskFactory(); } public ServiceBinderBase ServiceBinder { get; } @@ -66,7 +68,7 @@ public void Dispose() private async Task HandleConnection(NamedPipeServerStream pipeStream) { var logger = ConnectionLogger.Server(_log); - var ctx = new ServerConnectionContext(pipeStream, logger, _methodHandlers); + var ctx = new ServerConnectionContext(pipeStream, logger, _methodHandlers, _taskFactory); await Task.Run(new PipeReader(pipeStream, ctx, logger, ctx.Dispose, InvokeError).ReadLoop); } diff --git a/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs b/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs index 52e7461..0051e10 100644 --- a/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs +++ b/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs @@ -31,4 +31,17 @@ public class NamedPipeServerOptions /// public PipeSecurity PipeSecurity { get; set; } #endif + + /// + /// Gets or sets a Custom Task Factory to control how tasks are serviced. + /// For example, causing threads to be processsed in FIFO rather than LIFO + /// by using TaskCreationOptions.preferFairness + /// + public TaskFactory TaskFactory { get; set; } + + /// + /// Gets or sets a count of threads to use for the listener. + /// If you need to address a synchronous code execution issue, try increasing + /// + public int ThreadPoolSize { get; set; } = 4; } \ No newline at end of file