Skip to content

Commit

Permalink
STE: Provide indicators of progress (#517)
Browse files Browse the repository at this point in the history
extra: give access to executors within executor group
  • Loading branch information
nayato committed Oct 17, 2019
1 parent 5bfd679 commit a9d0723
Show file tree
Hide file tree
Showing 16 changed files with 110 additions and 23 deletions.
6 changes: 6 additions & 0 deletions src/DotNetty.Common/Concurrency/AbstractEventExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace DotNetty.Common.Concurrency
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common.Internal.Logging;
Expand Down Expand Up @@ -46,6 +47,11 @@ protected AbstractEventExecutor(IEventExecutorGroup parent)
/// <inheritdoc cref="IEventExecutor"/>
public bool InEventLoop => this.IsInEventLoop(Thread.CurrentThread);

/// <inheritdoc cref="IEventExecutor" />
public IEnumerable<IEventExecutor> Items => this.GetItems();

protected abstract IEnumerable<IEventExecutor> GetItems();

/// <inheritdoc cref="IEventExecutor"/>
public abstract bool IsInEventLoop(Thread thread);

Expand Down
5 changes: 5 additions & 0 deletions src/DotNetty.Common/Concurrency/AbstractEventExecutorGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace DotNetty.Common.Concurrency
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -20,6 +21,8 @@ public abstract class AbstractEventExecutorGroup : IEventExecutorGroup

public abstract Task TerminationCompletion { get; }

public IEnumerable<IEventExecutor> Items => this.GetItems();

public abstract IEventExecutor GetNext();

public void Execute(IRunnable task) => this.GetNext().Execute(task);
Expand Down Expand Up @@ -65,5 +68,7 @@ public abstract class AbstractEventExecutorGroup : IEventExecutorGroup
public Task ShutdownGracefullyAsync() => this.ShutdownGracefullyAsync(DefaultShutdownQuietPeriod, DefaultShutdownTimeout);

public abstract Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout);

protected abstract IEnumerable<IEventExecutor> GetItems();
}
}
6 changes: 6 additions & 0 deletions src/DotNetty.Common/Concurrency/IEventExecutorGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@
namespace DotNetty.Common.Concurrency
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

/// <summary>
/// Provides an access to a set of <see cref="IEventExecutor"/>s it manages.
/// </summary>
public interface IEventExecutorGroup : IScheduledExecutorService
{
/// <summary>
/// Returns list of owned event executors.
/// </summary>
IEnumerable<IEventExecutor> Items { get; }

/// <summary>
/// Returns <c>true</c> if and only if this executor is being shut down via <see cref="ShutdownGracefullyAsync()" />.
/// </summary>
Expand Down
51 changes: 35 additions & 16 deletions src/DotNetty.Common/Concurrency/SingleThreadEventExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class SingleThreadEventExecutor : AbstractScheduledEventExecutor
PreciseTimeSpan gracefulShutdownQuietPeriod;
PreciseTimeSpan gracefulShutdownTimeout;
readonly ISet<Action> shutdownHooks = new HashSet<Action>();
long progress;

/// <summary>Creates a new instance of <see cref="SingleThreadEventExecutor"/>.</summary>
public SingleThreadEventExecutor(string threadName, TimeSpan breakoutInterval)
Expand Down Expand Up @@ -86,6 +87,21 @@ protected SingleThreadEventExecutor(IEventExecutorGroup parent, string threadNam
/// </summary>
public TaskScheduler Scheduler => this.scheduler;

/// <summary>
/// Allows to track whether executor is progressing through its backlog. Useful for diagnosing / mitigating stalls due to blocking calls in conjunction with IsBacklogEmpty property.
/// </summary>
public long Progress => Volatile.Read(ref this.progress);

/// <summary>
/// Indicates whether executor's backlog is empty. Useful for diagnosing / mitigating stalls due to blocking calls in conjunction with Progress property.
/// </summary>
public bool IsBacklogEmpty => this.taskQueue.IsEmpty;

/// <summary>
/// Gets length of backlog of tasks queued for immediate execution.
/// </summary>
public int BacklogLength => this.taskQueue.Count;

void Loop()
{
this.SetCurrentExecutor(this);
Expand Down Expand Up @@ -140,6 +156,8 @@ public override void Execute(IRunnable task)
}
}

protected override IEnumerable<IEventExecutor> GetItems() => new[] { this };

protected void WakeUp(bool inEventLoop)
{
if (!inEventLoop || (this.executionState == ST_SHUTTING_DOWN))
Expand All @@ -152,12 +170,12 @@ protected void WakeUp(bool inEventLoop)
/// Adds an <see cref="Action"/> which will be executed on shutdown of this instance.
/// </summary>
/// <param name="action">The <see cref="Action"/> to run on shutdown.</param>
public void AddShutdownHook(Action action)
public void AddShutdownHook(Action action)
{
if (this.InEventLoop)
if (this.InEventLoop)
{
this.shutdownHooks.Add(action);
}
}
else
{
this.Execute(() => this.shutdownHooks.Add(action));
Expand All @@ -169,53 +187,53 @@ public void AddShutdownHook(Action action)
/// executed on shutdown of this instance.
/// </summary>
/// <param name="action">The <see cref="Action"/> to remove.</param>
public void RemoveShutdownHook(Action action)
public void RemoveShutdownHook(Action action)
{
if (this.InEventLoop)
if (this.InEventLoop)
{
this.shutdownHooks.Remove(action);
}
}
else
{
this.Execute(() => this.shutdownHooks.Remove(action));
}
}

bool RunShutdownHooks()
bool RunShutdownHooks()
{
bool ran = false;

// Note shutdown hooks can add / remove shutdown hooks.
while (this.shutdownHooks.Count > 0)
while (this.shutdownHooks.Count > 0)
{
var copy = this.shutdownHooks.ToArray();
this.shutdownHooks.Clear();

for (var i = 0; i < copy.Length; i++)
{
try
try
{
copy[i]();
}
catch (Exception ex)
}
catch (Exception ex)
{
Logger.Warn("Shutdown hook raised an exception.", ex);
}
finally
}
finally
{
ran = true;
}
}
}

if (ran)
if (ran)
{
this.lastExecutionTime = PreciseTimeSpan.FromStart;
}

return ran;
}


/// <inheritdoc cref="IEventExecutor"/>
public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan timeout)
Expand Down Expand Up @@ -398,6 +416,7 @@ protected bool RunAllTasks()

while (true)
{
Volatile.Write(ref this.progress, this.progress + 1); // volatile write is enough as this is the only thread ever writing
SafeExecute(task);
task = this.PollTask();
if (task == null)
Expand Down
3 changes: 3 additions & 0 deletions src/DotNetty.Transport.Libuv/DispatcherEventLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace DotNetty.Transport.Libuv
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -69,5 +70,7 @@ internal void Dispatch(NativeHandle handle)
public Task RegisterAsync(IChannel channel) => channel.Unsafe.RegisterAsync(this);

public new IEventLoopGroup Parent => (IEventLoopGroup)base.Parent;

public new IEnumerable<IEventLoop> Items => new[] { this };
}
}
5 changes: 5 additions & 0 deletions src/DotNetty.Transport.Libuv/DispatcherEventLoopGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
namespace DotNetty.Transport.Libuv
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetty.Common.Concurrency;
using DotNetty.Transport.Channels;
Expand All @@ -29,6 +30,10 @@ public DispatcherEventLoopGroup()

internal DispatcherEventLoop Dispatcher => this.dispatcherEventLoop;

protected override IEnumerable<IEventExecutor> GetItems() => new[] { this.dispatcherEventLoop };

public new IEnumerable<IEventLoop> Items => new[] { this.dispatcherEventLoop };

IEventLoop IEventLoopGroup.GetNext() => (IEventLoop)this.GetNext();

public override IEventExecutor GetNext() => this.dispatcherEventLoop;
Expand Down
3 changes: 3 additions & 0 deletions src/DotNetty.Transport.Libuv/EventLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace DotNetty.Transport.Libuv
{
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetty.Transport.Channels;

Expand All @@ -19,5 +20,7 @@ public EventLoop(IEventLoopGroup parent, string threadName)
public Task RegisterAsync(IChannel channel) => channel.Unsafe.RegisterAsync(this);

public new IEventLoopGroup Parent => (IEventLoopGroup)base.Parent;

public new IEnumerable<IEventLoop> Items => new[] { this };
}
}
5 changes: 5 additions & 0 deletions src/DotNetty.Transport.Libuv/EventLoopGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
namespace DotNetty.Transport.Libuv
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -27,6 +28,8 @@ public sealed class EventLoopGroup : AbstractEventExecutorGroup, IEventLoopGroup

public override Task TerminationCompletion { get; }

public new IEnumerable<IEventLoop> Items => this.eventLoops;

public EventLoopGroup()
: this(DefaultEventLoopCount)
{
Expand Down Expand Up @@ -119,5 +122,7 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time
}
return this.TerminationCompletion;
}

protected override IEnumerable<IEventExecutor> GetItems() => this.eventLoops;
}
}
10 changes: 6 additions & 4 deletions src/DotNetty.Transport.Libuv/LoopExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
namespace DotNetty.Transport.Libuv
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Threading.Tasks;
Expand All @@ -17,7 +18,6 @@ namespace DotNetty.Transport.Libuv
using System.Threading;
using DotNetty.Common;
using DotNetty.Transport.Libuv.Native;

using Timer = Native.Timer;

class LoopExecutor : AbstractScheduledEventExecutor
Expand Down Expand Up @@ -297,7 +297,7 @@ void RunAllTasks(long timeout)
long runTasks = 0;
long executionTime;
this.wakeUp = false;
for (;;)
for (; ; )
{
SafeExecute(task);

Expand Down Expand Up @@ -402,7 +402,7 @@ static bool RunAllTasksFrom(IQueue<IRunnable> taskQueue)
{
return false;
}
for (;;)
for (; ; )
{
SafeExecute(task);
task = PollTaskFrom(taskQueue);
Expand Down Expand Up @@ -488,7 +488,7 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time
bool inEventLoop = this.InEventLoop;
bool wakeUpLoop;
int oldState;
for (;;)
for (; ; )
{
if (this.IsShuttingDown)
{
Expand Down Expand Up @@ -540,5 +540,7 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time

return this.TerminationCompletion;
}

protected override IEnumerable<IEventExecutor> GetItems() => new[] { this };
}
}
3 changes: 3 additions & 0 deletions src/DotNetty.Transport.Libuv/WorkerEventLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
namespace DotNetty.Transport.Libuv
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Threading.Tasks;
Expand Down Expand Up @@ -112,6 +113,8 @@ void OnRead(Pipe handle, int status)

public new IEventLoopGroup Parent => (IEventLoopGroup)base.Parent;

IEnumerable<IEventLoop> IEventLoopGroup.Items => new[] { this };

sealed class PipeConnect : ConnectRequest
{
const int MaximumRetryCount = 10;
Expand Down
5 changes: 5 additions & 0 deletions src/DotNetty.Transport.Libuv/WorkerEventLoopGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
namespace DotNetty.Transport.Libuv
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Linq;
Expand Down Expand Up @@ -84,6 +85,8 @@ public WorkerEventLoopGroup(DispatcherEventLoopGroup eventLoopGroup, int eventLo

internal string PipeName { get; }

IEnumerable<IEventLoop> IEventLoopGroup.Items => this.eventLoops;

internal void Accept(NativeHandle handle)
{
Debug.Assert(this.dispatcherLoop != null);
Expand Down Expand Up @@ -126,5 +129,7 @@ public override Task ShutdownGracefullyAsync(TimeSpan quietPeriod, TimeSpan time
}
return this.TerminationCompletion;
}

protected override IEnumerable<IEventExecutor> GetItems() => this.eventLoops;
}
}
5 changes: 5 additions & 0 deletions src/DotNetty.Transport/Channels/AffinitizedEventLoopGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace DotNetty.Transport.Channels
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetty.Common.Concurrency;

Expand All @@ -23,6 +24,10 @@ public class AffinitizedEventLoopGroup : AbstractEventExecutorGroup, IEventLoopG
/// <inheritdoc cref="IEventExecutorGroup"/>
public override Task TerminationCompletion => this.innerGroup.TerminationCompletion;

protected override IEnumerable<IEventExecutor> GetItems() => this.innerGroup.Items;

public new IEnumerable<IEventLoop> Items => ((IEventLoopGroup)this.innerGroup).Items;

/// <summary>
/// Creates a new instance of <see cref="AffinitizedEventLoopGroup"/>.
/// </summary>
Expand Down
Loading

0 comments on commit a9d0723

Please sign in to comment.