Skip to content

Commit

Permalink
Support asynchronous invocation of IODataStreamListener.StreamDispose…
Browse files Browse the repository at this point in the history
…dAsync method from ODataNotificationWriter class (#2079)

* Support asynchronous invocation of IODataStreamListener.StreamDisposedAsync method from ODataNotificationWriter class

* Make ODataNotificationWriter Dispose and DisposeAsync methods idempotent

Co-authored-by: John Gathogo <[email protected]>
  • Loading branch information
gathogojr and John Gathogo authored May 20, 2021
1 parent 1ef7c56 commit 9449c28
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 10 deletions.
55 changes: 45 additions & 10 deletions src/Microsoft.OData.Core/ODataNotificationWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,26 @@ namespace Microsoft.OData
/// <summary>
/// Wrapper for TextWriter to listen for dispose.
/// </summary>
#if NETSTANDARD2_0
internal sealed class ODataNotificationWriter : TextWriter, IAsyncDisposable
#else
internal sealed class ODataNotificationWriter : TextWriter
#endif
{
private readonly TextWriter textWriter;
private TextWriter textWriter;
private IODataStreamListener listener;
private bool disposed = false;
private readonly bool synchronous;

internal ODataNotificationWriter(TextWriter textWriter, IODataStreamListener listener)
internal ODataNotificationWriter(TextWriter textWriter, IODataStreamListener listener, bool synchronous = true)
: base(System.Globalization.CultureInfo.InvariantCulture)
{
Debug.Assert(textWriter != null, "Creating a notification writer for a null textWriter.");
Debug.Assert(listener != null, "Creating a notification writer with a null listener.");

this.textWriter = textWriter;
this.listener = listener;
this.synchronous = synchronous;
}

/// <inheritdoc/>
Expand Down Expand Up @@ -81,6 +88,7 @@ public override string ToString()
}

#region Write methods

/// <inheritdoc/>
public override void Write(char value)
{
Expand Down Expand Up @@ -168,6 +176,7 @@ public override void Write(ulong value)
#endregion

#region WriteLine methods

/// <inheritdoc/>
public override void WriteLine()
{
Expand Down Expand Up @@ -262,7 +271,6 @@ public override void WriteLine(ulong value)

#region async methods


/// <inheritdoc/>
public override Task FlushAsync()
{
Expand Down Expand Up @@ -319,18 +327,45 @@ public override Task WriteLineAsync(string value)
/// <param name="disposing">True if called from Dispose; false if called from the finalizer.</param>
protected override void Dispose(bool disposing)
{
if (disposing)
if (!this.disposed && disposing)
{
if (this.listener != null)
// Tell the listener that the stream is being disposed.
if (synchronous)
{
// Tell the listener that the stream is being disposed.
this.listener.StreamDisposed();
this.listener = null;
this.listener?.StreamDisposed();
}
else
{
this.listener?.StreamDisposedAsync().Wait();
}

this.listener = null;

this.textWriter?.Dispose();
this.textWriter = null;
}

this.textWriter.Dispose();
this.disposed = true;
base.Dispose(disposing);
}

#if NETSTANDARD2_0
public async ValueTask DisposeAsync()
{
if (!this.disposed && this.listener != null)
{
await this.listener.StreamDisposedAsync()
.ConfigureAwait(false);
this.textWriter?.Dispose();

this.listener = null;
this.textWriter = null;
}

// Dispose unmanaged resources
// Pass `false` to ensure functional equivalence with the synchronous dispose pattern
this.Dispose(false);
}
#endif
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,16 @@
<ProjectReference Include="..\Microsoft.OData.TestCommon\Microsoft.OData.TestCommon.csproj" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp2.1'">
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces">
<Version>5.0.0</Version>
</PackageReference>
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp3.1'">
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces">
<Version>5.0.0</Version>
</PackageReference>
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
//---------------------------------------------------------------------
// <copyright file="ODataNotificationWriterTests.cs" company="Microsoft">
// Copyright (C) Microsoft Corporation. All rights reserved. See License.txt in the project root for license information.
// </copyright>
//---------------------------------------------------------------------

using System;
using System.IO;
using System.Threading.Tasks;
using Xunit;

namespace Microsoft.OData.Tests
{
public class ODataNotificationWriterTests
{
private MemoryStream stream;
private TextWriter writer;
private IODataStreamListener streamListener;

public ODataNotificationWriterTests()
{
this.stream = new MemoryStream();
this.writer = new StreamWriter(this.stream);
this.streamListener = new MockODataStreamListener(this.writer);
}

[Theory]
[InlineData(true, "StreamDisposed")]
[InlineData(false, "StreamDisposedAsync")]
public void NotificationWriterDisposeShouldInvokeStreamDisposed(bool synchronous, string expected)
{
// We care about the notification writer being disposed
// We don't care about the writer passed to the notification writer
using (var notificationWriter = new ODataNotificationWriter(
new StreamWriter(new MemoryStream()),
this.streamListener,
synchronous))
{
}

var result = ReadStreamContents();

Assert.Equal(expected, result);
}

[Theory]
[InlineData(true, "StreamDisposed")]
[InlineData(false, "StreamDisposedAsync")]
public void NotificationWriterDisposeShouldBeIdempotent(bool synchronous, string expected)
{
var notificationWriter = new ODataNotificationWriter(
new StreamWriter(new MemoryStream()),
this.streamListener,
synchronous);

// 1st call to Dispose
notificationWriter.Dispose();
// 2nd call to Dispose
notificationWriter.Dispose();

var result = ReadStreamContents();

// StreamDisposed/StreamDisposeAsync was written only once
Assert.Equal(expected, result);
}

#if NETCOREAPP3_1
[Fact]
public async Task NotificationWriterDisposeShouldInvokeStreamDisposedAsync()
{
await using (var notificationWriter = new ODataNotificationWriter(
new StreamWriter(new MemoryStream()),
this.streamListener)) // `synchronous` argument becomes irrelevant since we'll directly call DisposeAsync
{
}

var result = await this.ReadStreamContentsAsync();

Assert.Equal("StreamDisposedAsync", result);
}

[Fact]
public async Task NotificationWriterDisposeAsyncShouldBeIdempotent()
{
var notificationWriter = new ODataNotificationWriter(
new StreamWriter(new MemoryStream()),
this.streamListener);

// 1st call to DisposeAsync
await notificationWriter.DisposeAsync();
// 2nd call to DisposeAsync
await notificationWriter.DisposeAsync();

var result = await this.ReadStreamContentsAsync();

// StreamDisposeAsync was written only once
Assert.Equal("StreamDisposedAsync", result);
}

#else
[Fact]
public async Task NotificationWriterDisposeShouldInvokeStreamDisposedAsync()
{
using (var notificationWriter = new ODataNotificationWriter(
new StreamWriter(new MemoryStream()),
this.streamListener,
/*synchronous*/ false))
{
}

var result = await this.ReadStreamContentsAsync();

Assert.Equal("StreamDisposedAsync", result);
}
#endif

private string ReadStreamContents()
{
this.stream.Position = 0;
return new StreamReader(this.stream).ReadToEnd();
}

private Task<string> ReadStreamContentsAsync()
{
this.stream.Position = 0;
return new StreamReader(this.stream).ReadToEndAsync();
}

private class MockODataStreamListener : IODataStreamListener
{
private TextWriter writer;

public MockODataStreamListener(TextWriter writer)
{
this.writer = writer;
}

public void StreamDisposed()
{
writer.Write("StreamDisposed");
writer.Flush();
}

public async Task StreamDisposedAsync()
{
await writer.WriteAsync("StreamDisposedAsync").ConfigureAwait(false);
await writer.FlushAsync().ConfigureAwait(false);
}

public void StreamRequested()
{
throw new NotImplementedException();
}

public Task StreamRequestedAsync()
{
throw new NotImplementedException();
}
}
}
}

0 comments on commit 9449c28

Please sign in to comment.