diff --git a/src/OmniSharp.Roslyn.CSharp/Workers/Diagnostics/AsyncAnalyzerWorkQueue.cs b/src/OmniSharp.Roslyn.CSharp/Workers/Diagnostics/AsyncAnalyzerWorkQueue.cs new file mode 100644 index 0000000000..3ef53aec9b --- /dev/null +++ b/src/OmniSharp.Roslyn.CSharp/Workers/Diagnostics/AsyncAnalyzerWorkQueue.cs @@ -0,0 +1,318 @@ +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.CodeAnalysis; +using Microsoft.Extensions.Logging; + +#nullable enable + +namespace OmniSharp.Roslyn.CSharp.Workers.Diagnostics +{ + public class AsyncAnalyzerWorkQueue + { + private readonly object _lock = new(); + private readonly Queue _foreground = new(); + private readonly Queue _background = new(); + private readonly ILogger _logger; + private TaskCompletionSource _takeWorkWaiter = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public AsyncAnalyzerWorkQueue(ILoggerFactory loggerFactory) + { + _logger = loggerFactory.CreateLogger(); + } + + public int PendingCount + { + get + { + lock (_lock) + return _foreground.PendingCount + _background.PendingCount; + } + } + + public void PutWork(IReadOnlyCollection documentIds, AnalyzerWorkType workType) + { + lock (_lock) + { + foreach (var documentId in documentIds) + { + _foreground.RequestCancellationIfActive(documentId); + _background.RequestCancellationIfActive(documentId); + + if (workType == AnalyzerWorkType.Foreground) + _foreground.Enqueue(documentId); + else if (workType == AnalyzerWorkType.Background) + _background.Enqueue(documentId); + } + + // Complete the work waiter task to allow work to be taken from the queue. + if (!_takeWorkWaiter.Task.IsCompleted) + _takeWorkWaiter.SetResult(null); + } + } + + public async Task TakeWorkAsync(CancellationToken cancellationToken = default) + { + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + + Task awaitTask; + + lock (_lock) + { + if (_foreground.TryDequeue(out var documentId, out var cancellationTokenSource)) + { + return new QueueItem + ( + DocumentId: documentId, + CancellationToken: cancellationTokenSource.Token, + AnalyzerWorkType: AnalyzerWorkType.Foreground, + DocumentCount: _foreground.MaximumPendingCount, + DocumentCountRemaining: _foreground.PendingCount + ); + } + else if (_background.TryDequeue(out documentId, out cancellationTokenSource)) + { + return new QueueItem + ( + DocumentId: documentId, + CancellationToken: cancellationTokenSource.Token, + AnalyzerWorkType: AnalyzerWorkType.Background, + DocumentCount: _background.MaximumPendingCount, + DocumentCountRemaining: _background.PendingCount + ); + } + + if (_foreground.PendingCount == 0 && _background.PendingCount == 0 && _takeWorkWaiter.Task.IsCompleted) + _takeWorkWaiter = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + awaitTask = _takeWorkWaiter.Task; + } + + // There is no chance of the default cancellation token being cancelled, so we can + // simply wait for work to be queued. Otherwise, we need to handle the case that the + // token is cancelled before we have work to return. + if (cancellationToken == default) + { + await awaitTask.ConfigureAwait(false); + } + else + { + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + using (cancellationToken.Register(() => tcs.SetResult(null))) + { + await Task.WhenAny(awaitTask, tcs.Task).ConfigureAwait(false); + } + } + } + } + + public void WorkComplete(QueueItem item) + { + lock (_lock) + { + if (item.AnalyzerWorkType == AnalyzerWorkType.Foreground) + _foreground.WorkComplete(item.DocumentId, item.CancellationToken); + else if (item.AnalyzerWorkType == AnalyzerWorkType.Background) + _background.WorkComplete(item.DocumentId, item.CancellationToken); + } + } + + public async Task WaitForegroundWorkComplete(CancellationToken cancellationToken = default) + { + if (cancellationToken.IsCancellationRequested) + return; + + Task waitForgroundTask; + + lock (_lock) + waitForgroundTask = _foreground.GetWaiter(); + + if (waitForgroundTask.IsCompleted) + return; + + if (cancellationToken == default) + { + await waitForgroundTask.ConfigureAwait(false); + + return; + } + + var taskCompletion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + using (cancellationToken.Register(() => taskCompletion.SetResult(null))) + { + await Task.WhenAny(taskCompletion.Task, waitForgroundTask).ConfigureAwait(false); + + if (!waitForgroundTask.IsCompleted) + _logger.LogWarning($"Timeout before work got ready for foreground analysis queue. This is assertion to prevent complete api hang in case of error."); + } + } + + public bool TryPromote(DocumentId id) + { + var shouldEnqueue = false; + + lock (_lock) + { + shouldEnqueue = _background.IsEnqueued(id) || _background.IsActive(id); + } + + if (shouldEnqueue) + PutWork(new[] { id }, AnalyzerWorkType.Foreground); + + return shouldEnqueue; + } + + public record QueueItem + ( + DocumentId DocumentId, + CancellationToken CancellationToken, + AnalyzerWorkType AnalyzerWorkType, + int DocumentCount, + int DocumentCountRemaining + ); + + private class Queue + { + private readonly HashSet _pendingHash = new(); + private readonly Queue _pendingQueue = new(); + private readonly Dictionary> _active = new(); + private readonly List<(HashSet DocumentIds, TaskCompletionSource TaskCompletionSource)> _waiters = new(); + + public int PendingCount => _pendingQueue.Count; + + public int ActiveCount => _active.Count; + + public int MaximumPendingCount { get; private set; } + + public void RequestCancellationIfActive(DocumentId documentId) + { + if (_active.TryGetValue(documentId, out var active)) + { + foreach (var cts in active) + cts.Cancel(); + } + } + + public void Enqueue(DocumentId documentId) + { + if (_pendingHash.Add(documentId)) + { + _pendingQueue.Enqueue(documentId); + + if (_pendingQueue.Count > MaximumPendingCount) + MaximumPendingCount = _pendingQueue.Count; + } + } + + public bool IsEnqueued(DocumentId documentId) => + _pendingHash.Contains(documentId); + + public bool IsActive(DocumentId documentId) => + _active.ContainsKey(documentId); + + public void Remove(DocumentId documentId) + { + if (_pendingHash.Contains(documentId)) + { + _pendingHash.Remove(documentId); + + var backgroundQueueItems = _pendingQueue.ToList(); + + _pendingQueue.Clear(); + + foreach (var item in backgroundQueueItems) + { + if (item != documentId) + _pendingQueue.Enqueue(item); + } + } + } + + public bool TryDequeue([NotNullWhen(true)] out DocumentId? documentId, [NotNullWhen(true)] out CancellationTokenSource? cancellationTokenSource) + { + if (_pendingQueue.Count > 0) + { + documentId = _pendingQueue.Dequeue(); + + _pendingHash.Remove(documentId); + + if (!_active.TryGetValue(documentId, out var cancellationTokenSources)) + _active[documentId] = cancellationTokenSources = new List(); + + cancellationTokenSource = new CancellationTokenSource(); + + cancellationTokenSources.Add(cancellationTokenSource); + + return true; + } + + documentId = null; + cancellationTokenSource = null; + + return false; + } + + public void WorkComplete(DocumentId documentId, CancellationToken cancellationToken) + { + if (_active.TryGetValue(documentId, out var cancellationTokenSources)) + { + foreach (var cancellationTokenSource in cancellationTokenSources.ToList()) + { + if (cancellationTokenSource.Token == cancellationToken) + { + cancellationTokenSource.Dispose(); + + cancellationTokenSources.Remove(cancellationTokenSource); + + break; + } + } + + if (cancellationTokenSources.Count == 0) + _active.Remove(documentId); + + var isReenqueued = cancellationToken.IsCancellationRequested + && (_pendingHash.Contains(documentId) || _active.ContainsKey(documentId)); + + if (!isReenqueued) + { + foreach (var waiter in _waiters.ToList()) + { + if (waiter.DocumentIds.Remove(documentId) && waiter.DocumentIds.Count == 0) + { + waiter.TaskCompletionSource.SetResult(null); + + _waiters.Remove(waiter); + } + } + } + } + } + + public Task GetWaiter() + { + if (_active.Count == 0 && _pendingQueue.Count == 0) + return Task.CompletedTask; + + var documentIds = new HashSet(_pendingHash.Concat(_active.Keys)); + + var waiter = _waiters.FirstOrDefault(x => x.DocumentIds.SetEquals(documentIds)); + + if (waiter == default) + { + waiter = (documentIds, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)); + + _waiters.Add(waiter); + } + + return waiter.TaskCompletionSource.Task; + } + } + } +} diff --git a/src/OmniSharp.Roslyn.CSharp/Workers/Diagnostics/CSharpDiagnosticWorkerWithAnalyzers.cs b/src/OmniSharp.Roslyn.CSharp/Workers/Diagnostics/CSharpDiagnosticWorkerWithAnalyzers.cs index a9a2c2daaf..ddf73f6344 100644 --- a/src/OmniSharp.Roslyn.CSharp/Workers/Diagnostics/CSharpDiagnosticWorkerWithAnalyzers.cs +++ b/src/OmniSharp.Roslyn.CSharp/Workers/Diagnostics/CSharpDiagnosticWorkerWithAnalyzers.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; @@ -23,8 +23,7 @@ namespace OmniSharp.Roslyn.CSharp.Services.Diagnostics { public class CSharpDiagnosticWorkerWithAnalyzers : CSharpDiagnosticWorkerBase, IDisposable { - private readonly AnalyzerWorkQueue _workQueue; - private readonly SemaphoreSlim _throttler; + private readonly AsyncAnalyzerWorkQueue _workQueue; private readonly ILogger _logger; private readonly ConcurrentDictionary _currentDiagnosticResultLookup = new(); @@ -32,7 +31,8 @@ public class CSharpDiagnosticWorkerWithAnalyzers : CSharpDiagnosticWorkerBase, I private readonly DiagnosticEventForwarder _forwarder; private readonly OmniSharpOptions _options; private readonly OmniSharpWorkspace _workspace; - private const int WorkerWait = 250; + + private int _projectCount = 0; public CSharpDiagnosticWorkerWithAnalyzers( OmniSharpWorkspace workspace, @@ -46,8 +46,7 @@ public CSharpDiagnosticWorkerWithAnalyzers( { _logger = loggerFactory.CreateLogger(); _providers = providers.ToImmutableArray(); - _workQueue = new AnalyzerWorkQueue(loggerFactory, timeoutForPendingWorkMs: options.RoslynExtensionsOptions.DocumentAnalysisTimeoutMs * 3); - _throttler = new SemaphoreSlim(options.RoslynExtensionsOptions.DiagnosticWorkersThreadCount); + _workQueue = new AsyncAnalyzerWorkQueue(loggerFactory); _forwarder = forwarder; _options = options; @@ -57,8 +56,8 @@ public CSharpDiagnosticWorkerWithAnalyzers( _workspace.WorkspaceChanged += OnWorkspaceChanged; _workspace.OnInitialized += OnWorkspaceInitialized; - Task.Factory.StartNew(() => Worker(AnalyzerWorkType.Foreground), TaskCreationOptions.LongRunning); - Task.Factory.StartNew(() => Worker(AnalyzerWorkType.Background), TaskCreationOptions.LongRunning); + for (var i = 0; i < options.RoslynExtensionsOptions.DiagnosticWorkersThreadCount; i++) + Task.Run(Worker); OnWorkspaceInitialized(_workspace.Initialized); } @@ -95,87 +94,73 @@ private async Task> GetDiagnosticsByDocument _workQueue.TryPromote(documentId); } - await _workQueue.WaitForegroundWorkComplete(); + using var cancellationTokenSource = new CancellationTokenSource(_options.RoslynExtensionsOptions.DocumentAnalysisTimeoutMs * 3); + + await _workQueue.WaitForegroundWorkComplete(cancellationTokenSource.Token); } return documentIds - .Where(x => _currentDiagnosticResultLookup.ContainsKey(x)) - .Select(x => _currentDiagnosticResultLookup[x]) + .Select(x => _currentDiagnosticResultLookup.TryGetValue(x, out var value) ? value : null) + .Where(x => x != null) .ToImmutableArray(); } - private async Task Worker(AnalyzerWorkType workType) + private async Task Worker() { while (true) { + AsyncAnalyzerWorkQueue.QueueItem item = null; + DocumentId documentId; + CancellationToken? cancellationToken = null; + AnalyzerWorkType workType; + int documentCount; + int remaining; + try { - var solution = _workspace.CurrentSolution; + item = await _workQueue.TakeWorkAsync(); + (documentId, cancellationToken, workType, documentCount, remaining) = item; - var documents = _workQueue - .TakeWork(workType) - .Select(documentId => (projectId: solution.GetDocument(documentId)?.Project?.Id, documentId)) - .Where(x => x.projectId != null) - .ToImmutableArray(); - - if (documents.IsEmpty) + if (workType == AnalyzerWorkType.Background) { - _workQueue.WorkComplete(workType); - - await Task.Delay(WorkerWait); - - continue; - } - - var documentCount = documents.Length; - var documentCountRemaining = documentCount; - - // event every percentage increase, or every 10th if there are fewer than 1000 - var eventEvery = Math.Max(10, documentCount / 100); - - var documentsGroupedByProjects = documents - .GroupBy(x => x.projectId, x => x.documentId) - .ToImmutableArray(); - var projectCount = documentsGroupedByProjects.Length; + // event every percentage increase, or every 10th if there are fewer than 1000 + var eventEvery = Math.Max(10, documentCount / 100); - EventIfBackgroundWork(workType, BackgroundDiagnosticStatus.Started, projectCount, documentCount, documentCountRemaining); + if (documentCount == remaining + 1) + EventIfBackgroundWork(workType, BackgroundDiagnosticStatus.Started, _projectCount, documentCount, remaining); - void decrementDocumentCountRemaining() - { - var remaining = Interlocked.Decrement(ref documentCountRemaining); var done = documentCount - remaining; - if (done % eventEvery == 0) - { - EventIfBackgroundWork(workType, BackgroundDiagnosticStatus.Progress, projectCount, documentCount, remaining); - } + if (done % eventEvery == 0 || remaining == 0) + EventIfBackgroundWork(workType, BackgroundDiagnosticStatus.Progress, _projectCount, documentCount, remaining); } + var solution = _workspace.CurrentSolution; + var projectId = solution.GetDocument(documentId)?.Project?.Id; + try { - var projectAnalyzerTasks = - documentsGroupedByProjects - .Select(projectGroup => Task.Run(async () => - { - var projectPath = solution.GetProject(projectGroup.Key).FilePath; - await AnalyzeProject(solution, projectGroup, decrementDocumentCountRemaining); - })) - .ToImmutableArray(); - - await Task.WhenAll(projectAnalyzerTasks); + if (projectId != null) + await AnalyzeDocument(solution, projectId, documentId, cancellationToken.Value); } finally { - EventIfBackgroundWork(workType, BackgroundDiagnosticStatus.Finished, projectCount, documentCount, documentCountRemaining); + if (remaining == 0) + EventIfBackgroundWork(workType, BackgroundDiagnosticStatus.Finished, _projectCount, documentCount, remaining); } - - _workQueue.WorkComplete(workType); - - await Task.Delay(WorkerWait); + } + catch (OperationCanceledException) when (cancellationToken != null && cancellationToken.Value.IsCancellationRequested) + { + _logger.LogInformation($"Analyzer work cancelled."); } catch (Exception ex) { _logger.LogError($"Analyzer worker failed: {ex}"); } + finally + { + if (item != null) + _workQueue.WorkComplete(item); + } } } @@ -185,8 +170,17 @@ private void EventIfBackgroundWork(AnalyzerWorkType workType, BackgroundDiagnost _forwarder.BackgroundDiagnosticsStatus(status, numberProjects, numberFiles, numberFilesRemaining); } - private void QueueForAnalysis(ImmutableArray documentIds, AnalyzerWorkType workType) => + private void QueueForAnalysis(ImmutableArray documentIds, AnalyzerWorkType workType) + { + if (workType == AnalyzerWorkType.Background) + { + var solution = _workspace.CurrentSolution; + + _projectCount = documentIds.Select(x => solution.GetDocument(x)?.Project?.Id).Distinct().Count(x => x != null); + } + _workQueue.PutWork(documentIds, workType); + } private void OnWorkspaceChanged(object sender, WorkspaceChangeEventArgs changeEvent) { @@ -231,99 +225,73 @@ public override async Task> AnalyzeDocumentAsync(Documen var allAnalyzers = GetAnalyzersForProject(project); var compilation = await project.GetCompilationAsync(cancellationToken); - cancellationToken.ThrowIfCancellationRequested(); - return await AnalyzeDocument(project, allAnalyzers, compilation, CreateAnalyzerOptions(document.Project), document); + return await AnalyzeDocument(project, allAnalyzers, compilation, CreateAnalyzerOptions(document.Project), document, cancellationToken); } public override async Task> AnalyzeProjectsAsync(Project project, CancellationToken cancellationToken) { - var allAnalyzers = GetAnalyzersForProject(project); - var compilation = await project.GetCompilationAsync(cancellationToken); - var workspaceAnalyzerOptions = CreateAnalyzerOptions(project); - var documentAnalyzerTasks = new List(); - var diagnostics = ImmutableList.Empty; - - foreach (var document in project.Documents) - { - await _throttler.WaitAsync(cancellationToken); + var documentIds = project.DocumentIds.ToImmutableArray(); - documentAnalyzerTasks.Add(Task.Run(async () => - { - try - { - var documentDiagnostics = await AnalyzeDocument(project, allAnalyzers, compilation, workspaceAnalyzerOptions, document); - ImmutableInterlocked.Update(ref diagnostics, currentDiagnostics => currentDiagnostics.AddRange(documentDiagnostics)); - } - finally - { - _throttler.Release(); - } - }, cancellationToken)); - } + QueueForAnalysis(documentIds, AnalyzerWorkType.Foreground); - await Task.WhenAll(documentAnalyzerTasks); + await _workQueue.WaitForegroundWorkComplete(cancellationToken); - return diagnostics; + return documentIds + .Select(x => _currentDiagnosticResultLookup.TryGetValue(x, out var value) ? value : null) + .Where(x => x != null) + .SelectMany(x => x.Diagnostics) + .ToImmutableArray(); } - private async Task AnalyzeProject(Solution solution, IGrouping documentsGroupedByProject, Action decrementRemaining) + private async Task AnalyzeDocument(Solution solution, ProjectId projectId, DocumentId documentId, CancellationToken cancellationToken) { + cancellationToken.ThrowIfCancellationRequested(); + try { - var project = solution.GetProject(documentsGroupedByProject.Key); + var project = solution.GetProject(projectId); var allAnalyzers = GetAnalyzersForProject(project); var compilation = await project.GetCompilationAsync(); var workspaceAnalyzerOptions = CreateAnalyzerOptions(project); - var documentAnalyzerTasks = new List(); + var document = project.GetDocument(documentId); - foreach (var documentId in documentsGroupedByProject) - { - await _throttler.WaitAsync(); + var diagnostics = await AnalyzeDocument(project, allAnalyzers, compilation, workspaceAnalyzerOptions, document, cancellationToken); - documentAnalyzerTasks.Add(Task.Run(async () => - { - try - { - var document = project.GetDocument(documentId); - var diagnostics = await AnalyzeDocument(project, allAnalyzers, compilation, workspaceAnalyzerOptions, document); - UpdateCurrentDiagnostics(project, document, diagnostics); - decrementRemaining(); - } - finally - { - _throttler.Release(); - } - })); - } - - await Task.WhenAll(documentAnalyzerTasks); + UpdateCurrentDiagnostics(project, document, diagnostics); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; } catch (Exception ex) { - _logger.LogError($"Analysis of project {documentsGroupedByProject.Key} failed, underlaying error: {ex}"); + _logger.LogError($"Analysis of document {documentId} failed, underlying error: {ex}"); } } - private async Task> AnalyzeDocument(Project project, ImmutableArray allAnalyzers, Compilation compilation, AnalyzerOptions workspaceAnalyzerOptions, Document document) + private async Task> AnalyzeDocument(Project project, ImmutableArray allAnalyzers, Compilation compilation, AnalyzerOptions workspaceAnalyzerOptions, Document document, CancellationToken cancellationToken) { + cancellationToken.ThrowIfCancellationRequested(); + + // There's real possibility that bug in analyzer causes analysis hang at document. + using var perDocumentTimeout = + new CancellationTokenSource(_options.RoslynExtensionsOptions.DocumentAnalysisTimeoutMs); + using var combinedCancellation = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, perDocumentTimeout.Token); + try { - // There's real possibility that bug in analyzer causes analysis hang at document. - CancellationToken cancellationToken = new CancellationTokenSource( - _options.RoslynExtensionsOptions.DocumentAnalysisTimeoutMs) - .Token; + var documentSemanticModel = await document.GetSemanticModelAsync(combinedCancellation.Token); // Analyzers cannot be called with empty analyzer list. bool canDoFullAnalysis = allAnalyzers.Length > 0 && (!_options.RoslynExtensionsOptions.AnalyzeOpenDocumentsOnly || _workspace.IsDocumentOpen(document.Id)); - SemanticModel documentSemanticModel = await document.GetSemanticModelAsync(cancellationToken); SyntaxTree syntaxTree = documentSemanticModel.SyntaxTree; SyntaxTreeOptionsProvider provider = compilation.Options.SyntaxTreeOptionsProvider; - GeneratedKind kind = provider.IsGenerated(syntaxTree, cancellationToken); - if (kind is GeneratedKind.MarkedGenerated || syntaxTree.IsAutoGenerated(cancellationToken)) + GeneratedKind kind = provider.IsGenerated(syntaxTree, combinedCancellation.Token); + if (kind is GeneratedKind.MarkedGenerated || syntaxTree.IsAutoGenerated(combinedCancellation.Token)) { return Enumerable.Empty().ToImmutableArray(); } @@ -332,12 +300,12 @@ private async Task> AnalyzeDocument(Project project, // Those projects are on hard coded virtual project if (project.Name == $"{Configuration.OmniSharpMiscProjectName}.csproj") { - return syntaxTree.GetDiagnostics().ToImmutableArray(); + return syntaxTree.GetDiagnostics(cancellationToken: combinedCancellation.Token).ToImmutableArray(); } if (!canDoFullAnalysis) { - return documentSemanticModel.GetDiagnostics(); + return documentSemanticModel.GetDiagnostics(cancellationToken: combinedCancellation.Token); } CompilationWithAnalyzers compilationWithAnalyzers = compilation.WithAnalyzers(allAnalyzers, new CompilationWithAnalyzersOptions( @@ -348,12 +316,12 @@ private async Task> AnalyzeDocument(Project project, reportSuppressedDiagnostics: false)); Task> semanticDiagnosticsWithAnalyzers = compilationWithAnalyzers - .GetAnalyzerSemanticDiagnosticsAsync(documentSemanticModel, filterSpan: null, cancellationToken); + .GetAnalyzerSemanticDiagnosticsAsync(documentSemanticModel, filterSpan: null, combinedCancellation.Token); Task> syntaxDiagnosticsWithAnalyzers = compilationWithAnalyzers - .GetAnalyzerSyntaxDiagnosticsAsync(syntaxTree, cancellationToken); + .GetAnalyzerSyntaxDiagnosticsAsync(syntaxTree, combinedCancellation.Token); - ImmutableArray documentSemanticDiagnostics = documentSemanticModel.GetDiagnostics(null, cancellationToken); + ImmutableArray documentSemanticDiagnostics = documentSemanticModel.GetDiagnostics(null, combinedCancellation.Token); await Task.WhenAll(syntaxDiagnosticsWithAnalyzers, semanticDiagnosticsWithAnalyzers); @@ -363,6 +331,10 @@ private async Task> AnalyzeDocument(Project project, .Concat(documentSemanticDiagnostics) .ToImmutableArray(); } + catch (OperationCanceledException) when (combinedCancellation.Token.IsCancellationRequested) + { + throw; + } catch (Exception ex) { _logger.LogError($"Analysis of document {document.Name} failed or cancelled by timeout: {ex.Message}, analysers: {string.Join(", ", allAnalyzers)}"); diff --git a/tests/OmniSharp.Roslyn.CSharp.Tests/AsyncAnalyzerWorkerQueueFacts.cs b/tests/OmniSharp.Roslyn.CSharp.Tests/AsyncAnalyzerWorkerQueueFacts.cs new file mode 100644 index 0000000000..7fad2a433f --- /dev/null +++ b/tests/OmniSharp.Roslyn.CSharp.Tests/AsyncAnalyzerWorkerQueueFacts.cs @@ -0,0 +1,363 @@ +using System; +using System.Collections.Immutable; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.CodeAnalysis; +using Microsoft.Extensions.Logging; +using OmniSharp.Roslyn.CSharp.Workers.Diagnostics; +using Xunit; + +namespace OmniSharp.Roslyn.CSharp.Tests +{ +#pragma warning disable VSTHRD103 // Call async methods when in an async method + public class AsyncAnalyzerWorkerQueueFacts + { + private class Logger : ILogger + { + public IDisposable BeginScope(TState state) + { + return null; + } + + public bool IsEnabled(LogLevel logLevel) => true; + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func formatter) + { + RecordedMessages = RecordedMessages.Add(state.ToString()); + } + + public ImmutableArray RecordedMessages { get; set; } = ImmutableArray.Create(); + } + + private class LoggerFactory : ILoggerFactory + { + public Logger Logger { get; } = new Logger(); + + public void AddProvider(ILoggerProvider provider) + { + } + + public ILogger CreateLogger(string categoryName) + { + return Logger; + } + + public void Dispose() + { + } + } + + [Theory] + [InlineData(AnalyzerWorkType.Background)] + [InlineData(AnalyzerWorkType.Foreground)] + public async Task WhenWorksIsAddedToQueueThenTheyWillBeReturned(AnalyzerWorkType workType) + { + var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory()); + var document = CreateTestDocumentId(); + + queue.PutWork(new[] { document }, workType); + + var work = await queue.TakeWorkAsyncWithTimeout(); + + Assert.Equal(document, work.DocumentId); + Assert.Equal(0, queue.PendingCount); + } + + [Fact] + public async Task WhenForegroundWorkIsAddedThenWaitNextIterationOfItReady() + { + var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory()); + var document = CreateTestDocumentId(); + + queue.PutWork(new[] { document }, AnalyzerWorkType.Foreground); + + var pendingTask = queue.WaitForegroundWorkCompleteWithTimeout(500); + + Assert.False(pendingTask.IsCompleted); + + var work = await queue.TakeWorkAsyncWithTimeout(); + + queue.WorkComplete(work); + + pendingTask.Wait(TimeSpan.FromMilliseconds(50)); + + Assert.True(pendingTask.IsCompleted); + } + + [Fact] + public async Task WhenForegroundWorkIsUnderAnalysisOutFromQueueThenWaitUntilNextIterationOfItIsReady() + { + var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory()); + var document = CreateTestDocumentId(); + + queue.PutWork(new[] { document }, AnalyzerWorkType.Foreground); + + var work = await queue.TakeWorkAsync(); + + var pendingTask = queue.WaitForegroundWorkCompleteWithTimeout(500); + pendingTask.Wait(TimeSpan.FromMilliseconds(50)); + + Assert.False(pendingTask.IsCompleted); + queue.WorkComplete(work); + pendingTask.Wait(TimeSpan.FromMilliseconds(50)); + Assert.True(pendingTask.IsCompleted); + } + + [Fact] + public async Task WhenMultipleThreadsAreConsumingAnalyzerWorkerQueueItWorksAsExpected() + { + var now = DateTime.UtcNow; + + var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory()); + + var parallelQueues = + Enumerable.Range(0, 10) + .Select(_ => + Task.Run(async () => + { + var document = CreateTestDocumentId(); + + queue.PutWork(new[] { document }, AnalyzerWorkType.Foreground); + + var work = await queue.TakeWorkAsync(); + + var pendingTask = queue.WaitForegroundWorkCompleteWithTimeout(1000); + + var pendingTask2 = queue.WaitForegroundWorkCompleteWithTimeout(1000); + + pendingTask.Wait(TimeSpan.FromMilliseconds(300)); + })) + .ToArray(); + + await Task.WhenAll(parallelQueues); + + Assert.Equal(0, queue.PendingCount); + } + + [Fact] + public async Task WhenNewWorkIsAddedAgainWhenPreviousIsAnalysing_ThenDontWaitAnotherOneToGetReady() + { + var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory()); + var document1 = CreateTestDocumentId(); + var document2 = CreateTestDocumentId(); + + queue.PutWork(new[] { document1 }, AnalyzerWorkType.Foreground); + + var work = await queue.TakeWorkAsync(); + var waitingCall = Task.Run(async () => await queue.WaitForegroundWorkCompleteWithTimeout(10 * 1000)); + await Task.Delay(50); + + // User updates code -> document is queued again during period when theres already api call waiting + // to continue. + queue.PutWork(new[] { document2 }, AnalyzerWorkType.Foreground); + + // First iteration of work is done. + queue.WorkComplete(work); + + // Waiting call continues because its iteration of work is done, even when theres next + // already waiting. + waitingCall.Wait(50); + + Assert.True(waitingCall.IsCompleted); + } + + [Fact] + public async Task WhenWorkIsAddedAgainWhenPreviousIsAnalysing_ThenContinueWaiting() + { + var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory()); + var document = CreateTestDocumentId(); + + queue.PutWork(new[] { document }, AnalyzerWorkType.Foreground); + + var work = await queue.TakeWorkAsync(); + var waitingCall = Task.Run(async () => await queue.WaitForegroundWorkCompleteWithTimeout(10 * 1000)); + await Task.Delay(50); + + // User updates code -> document is queued again during period when theres already api call waiting + // to continue. + queue.PutWork(new[] { document }, AnalyzerWorkType.Foreground); + + // First iteration of work is done. + queue.WorkComplete(work); + + // Waiting call continues because its iteration of work is done, even when theres next + // already waiting. + waitingCall.Wait(50); + + Assert.False(waitingCall.IsCompleted); + } + + [Fact] + public void WhenBackgroundWorkIsAdded_DontWaitIt() + { + var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory()); + var document = CreateTestDocumentId(); + + queue.PutWork(new[] { document }, AnalyzerWorkType.Background); + + Assert.True(queue.WaitForegroundWorkComplete().IsCompleted); + } + + [Fact] + public void WhenSingleFileIsPromoted_ThenPromoteItFromBackgroundQueueToForeground() + { + var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory()); + var document = CreateTestDocumentId(); + + queue.PutWork(new[] { document }, AnalyzerWorkType.Background); + + queue.TryPromote(document); + + Assert.NotEqual(0, queue.PendingCount); + } + + [Fact] + public void WhenFileIsntAtBackgroundQueueAndTriedToBePromoted_ThenDontDoNothing() + { + var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory()); + var document = CreateTestDocumentId(); + + queue.TryPromote(document); + + Assert.Equal(0, queue.PendingCount); + } + + [Fact] + public async Task WhenFileIsProcessingInBackgroundQueue_ThenPromoteItAsForeground() + { + var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory()); + var document = CreateTestDocumentId(); + + queue.PutWork(new[] { document }, AnalyzerWorkType.Background); + + await queue.TakeWorkAsyncWithTimeout(); + + queue.TryPromote(document); + + await queue.TakeWorkAsyncWithTimeout(); + } + + [Fact] + public async Task WhenFileIsAddedMultipleTimes_DuplicatesAreIgnored() + { + var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory()); + var document = CreateTestDocumentId(); + + queue.PutWork(new[] { document }, AnalyzerWorkType.Background); + + queue.PutWork(new[] { document }, AnalyzerWorkType.Background); + + await queue.TakeWorkAsyncWithTimeout(); + + await Assert.ThrowsAsync(() => + queue.TakeWorkAsyncWithTimeout()); + } + + [Fact] + public async Task WhenFileIsAddedWhileProcessing_ThePeviousRunIsCancelled() + { + var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory()); + var document = CreateTestDocumentId(); + + queue.PutWork(new[] { document }, AnalyzerWorkType.Background); + + var result = await queue.TakeWorkAsyncWithTimeout(); + + Assert.False(result.CancellationToken.IsCancellationRequested); + + queue.PutWork(new[] { document }, AnalyzerWorkType.Background); + + Assert.True(result.CancellationToken.IsCancellationRequested); + } + + [Fact] + public async Task WhenQueueIsEmpty_TakeWorkRespondsToCancellation() + { + var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory()); + var document = CreateTestDocumentId(); + + await Assert.ThrowsAsync(() => + queue.TakeWorkAsyncWithTimeout()); + } + + [Fact] + public async Task WhenAwaitingForForgroundWork_CancellationIsHandled() + { + var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory()); + var document = CreateTestDocumentId(); + + queue.PutWork(new[] { document }, AnalyzerWorkType.Foreground); + + var isCancelled = await queue.WaitForegroundWorkCompleteWithTimeout(50); + + Assert.True(isCancelled); + } + + [Fact] + public async Task WhenDequeingWork_ItsReturnedInOrderForgroundFirst() + { + var queue = new AsyncAnalyzerWorkQueue(new LoggerFactory()); + var document1 = CreateTestDocumentId(); + var document2 = CreateTestDocumentId(); + var document3 = CreateTestDocumentId(); + var document4 = CreateTestDocumentId(); + + queue.PutWork(new[] { document3 }, AnalyzerWorkType.Background); + + queue.PutWork(new[] { document1 }, AnalyzerWorkType.Foreground); + + queue.PutWork(new[] { document4 }, AnalyzerWorkType.Background); + + queue.PutWork(new[] { document2 }, AnalyzerWorkType.Foreground); + + var result1 = await queue.TakeWorkAsyncWithTimeout(); + + Assert.Equal(document1, result1.DocumentId); + + var result2 = await queue.TakeWorkAsyncWithTimeout(); + + Assert.Equal(document2, result2.DocumentId); + + var result3 = await queue.TakeWorkAsyncWithTimeout(); + + Assert.Equal(document3, result3.DocumentId); + + var result4 = await queue.TakeWorkAsyncWithTimeout(); + + Assert.Equal(document4, result4.DocumentId); + } + + private static DocumentId CreateTestDocumentId() + { + var projectInfo = ProjectInfo.Create( + id: ProjectId.CreateNewId(), + version: VersionStamp.Create(), + name: "testProject", + assemblyName: "AssemblyName", + language: LanguageNames.CSharp); + + return DocumentId.CreateNewId(projectInfo.Id); + } + } + + public static class AsyncAnalyzerWorkerQueueFactsExtensions + { + public static async Task TakeWorkAsyncWithTimeout(this AsyncAnalyzerWorkQueue queue) + { + using var cts = new CancellationTokenSource(50); + + return await queue.TakeWorkAsync(cts.Token); + } + + public static async Task WaitForegroundWorkCompleteWithTimeout(this AsyncAnalyzerWorkQueue queue, int timeout) + { + using var cts = new CancellationTokenSource(timeout); + + await queue.WaitForegroundWorkComplete(cts.Token); + + return cts.Token.IsCancellationRequested; + } + } +#pragma warning restore VSTHRD103 // Call async methods when in an async method +} diff --git a/tests/OmniSharp.Roslyn.CSharp.Tests/TestEventEmitter.cs b/tests/OmniSharp.Roslyn.CSharp.Tests/TestEventEmitter.cs index da0e41afc2..fddcb1dce4 100644 --- a/tests/OmniSharp.Roslyn.CSharp.Tests/TestEventEmitter.cs +++ b/tests/OmniSharp.Roslyn.CSharp.Tests/TestEventEmitter.cs @@ -1,46 +1,75 @@ using System; -using System.Collections.Concurrent; -using System.Collections.Immutable; +using System.Collections.Generic; using System.Linq; using System.Linq.Expressions; +using System.Threading; using System.Threading.Tasks; +using Newtonsoft.Json; using OmniSharp.Eventing; namespace OmniSharp.Roslyn.CSharp.Tests { public class TestEventEmitter : IEventEmitter + { + private readonly object _lock = new(); + private readonly List _messages = new(); + private readonly List<(Predicate Predicate, TaskCompletionSource TaskCompletionSource)> _predicates = new(); + + public async Task ExpectForEmitted(Expression> predicate) { - public ImmutableArray Messages { get; private set; } = ImmutableArray.Empty; + var asCompiledPredicate = predicate.Compile(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - public async Task ExpectForEmitted(Expression> predicate) + lock (_lock) { - var asCompiledPredicate = predicate.Compile(); + if (_messages.Any(m => asCompiledPredicate(m))) + return; - // May seem hacky but nothing is more painfull to debug than infinite hanging test ... - for(int i = 0; i < 100; i++) - { - if(Messages.Any(m => asCompiledPredicate(m))) - { - return; - } + _predicates.Add((asCompiledPredicate, tcs)); + } - await Task.Delay(250); - } + try + { + using var cts = new CancellationTokenSource(25000); - throw new InvalidOperationException($"Timeout reached before expected event count reached before prediction {predicate} came true, current diagnostics '{String.Join(";", Messages)}'"); + cts.Token.Register(() => tcs.SetCanceled()); + + await tcs.Task; } + catch (OperationCanceledException) + { + var messages = string.Join(";", _messages.Select(x => JsonConvert.SerializeObject(x))); - public void Clear() + throw new InvalidOperationException($"Timeout reached before expected event count reached before prediction {predicate} came true, current diagnostics '{messages}'"); + } + finally { - Messages = ImmutableArray.Empty; + lock (_lock) + _predicates.Remove((asCompiledPredicate, tcs)); } + } + + public void Clear() + { + lock (_lock) + _messages.Clear(); + } - public void Emit(string kind, object args) + public void Emit(string kind, object args) + { + if (args is T asT) { - if(args is T asT) + lock (_lock) { - Messages = Messages.Add(asT); + _messages.Add(asT); + + foreach (var (predicate, tcs) in _predicates) + { + if (predicate(asT)) + tcs.SetResult(null); + } } } } -} \ No newline at end of file + } +}