Skip to content

Commit

Permalink
Implement IAsyncDisposable for ODataMessageWriter (#2683)
Browse files Browse the repository at this point in the history
* Implement IAsyncDisposable on ODataMessageWriter and OutputContext

* Create stream wrapper to detect sync I/O in async code

* Allow synchronous I/O in WriteMetadataAsync()

* Add tests

* Allow synchronous flush in older frameworks

* Disable synchronous I/O in async writer tests

* Fix failing tests on older frameworks

* Refactor based on review comments

* Address review comments

* Remove whitespace

* Remove sync I/O in more async tests

* Fix failing test in .netcoreapp2.1
  • Loading branch information
habbes authored Jun 20, 2023
1 parent 3133a34 commit 22b8a0a
Show file tree
Hide file tree
Showing 39 changed files with 1,520 additions and 121 deletions.
2 changes: 1 addition & 1 deletion src/Microsoft.OData.Core/AsyncBufferedStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ public override long Position
/// </summary>
public override void Flush()
{
// no-op
// This can be called from writers that are put on top of this stream when
// they are closed/disposed
this.FlushSync();
}

/// <summary>
Expand Down
5 changes: 5 additions & 0 deletions src/Microsoft.OData.Core/Json/JsonWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ namespace Microsoft.OData.Json
using Microsoft.OData.Edm;
#if NETCOREAPP3_1_OR_GREATER
using System.Text.Json;
using System.Threading.Tasks;
#endif
#endregion Namespaces

/// <summary>
/// Writer for the JSON format. http://www.json.org
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable", Justification = "This class does not own the underlying stream/writer and thus should never dispose it.")]
#if NETCOREAPP3_1_OR_GREATER
internal sealed partial class JsonWriter : IJsonStreamWriter, IJsonStreamWriterAsync, IDisposable, IAsyncDisposable
#else
internal sealed partial class JsonWriter : IJsonStreamWriter, IJsonStreamWriterAsync, IDisposable
#endif
{
/// <summary>
/// Writer to write text into.
Expand Down
28 changes: 28 additions & 0 deletions src/Microsoft.OData.Core/Json/JsonWriterAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,30 @@ public Task FlushAsync()
return this.writer.FlushAsync();
}

#if NETCOREAPP3_1_OR_GREATER
public async ValueTask DisposeAsync()
{
if (this.ArrayPool != null && this.wrappedBuffer.Value != null)
{
BufferUtils.ReturnToBuffer(this.ArrayPool, this.wrappedBuffer.Value);
this.ArrayPool = null;
this.wrappedBuffer.Value = null;
}

if (this.binaryValueStream != null)
{
try
{
await this.binaryValueStream.DisposeAsync().ConfigureAwait(false);
}
finally
{
this.binaryValueStream = null;
}
}
}
#endif

/// <inheritdoc/>
public async Task<Stream> StartStreamValueScopeAsync()
{
Expand All @@ -399,7 +423,11 @@ public async Task<Stream> StartStreamValueScopeAsync()
public async Task EndStreamValueScopeAsync()
{
await this.binaryValueStream.FlushAsync().ConfigureAwait(false);
#if NETCOREAPP3_1_OR_GREATER
await this.binaryValueStream.DisposeAsync().ConfigureAwait(false);
#else
this.binaryValueStream.Dispose();
#endif
this.binaryValueStream = null;
await this.writer.FlushAsync().ConfigureAwait(false);
await this.writer.WriteAsync(JsonConstants.QuoteCharacter).ConfigureAwait(false);
Expand Down
89 changes: 72 additions & 17 deletions src/Microsoft.OData.Core/JsonLight/ODataJsonLightOutputContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -858,46 +858,102 @@ await this.FlushAsync()
/// <param name="disposing">If 'true' this method is called from user code; if 'false' it is called by the runtime.</param>
[SuppressMessage("Microsoft.Usage", "CA2213:DisposableFieldsShouldBeDisposed", MessageId = "textWriter", Justification = "We don't dispose the jsonWriter or textWriter, instead we dispose the underlying stream directly.")]
protected override void Dispose(bool disposing)
{
if (disposing)
{
try
{
if (this.messageOutputStream != null)
{
// The IJsonWriter will flush the underlying stream
this.jsonWriter.Flush();

if (this.jsonWriter is IDisposable disposableWriter)
{
disposableWriter.Dispose();
}

// In the async case the underlying stream is the async buffered stream, so we have to flush that explicitly.
if (this.asynchronousOutputStream != null)
{
#if NETSTANDARD1_1
this.asynchronousOutputStream.FlushSync();
this.asynchronousOutputStream.Dispose();
#else
this.asynchronousOutputStream.Flush();
// We are working with a BufferedStream here. We flushed it already, so there is nothing else to dispose. And it would dispose the
// inner stream as well.
#endif
}

// Dispose the message stream (note that we OWN this stream, so we always dispose it).
this.messageOutputStream.Dispose();
}

if (this.BinaryValueStream != null)
{
this.BinaryValueStream.Flush();
this.BinaryValueStream.Dispose();
}

if (this.StringWriter != null)
{
this.StringWriter.Flush();
this.StringWriter.Dispose();
}
}
finally
{
this.messageOutputStream = null;
this.asynchronousOutputStream = null;
this.BinaryValueStream = null;
this.textWriter = null;
this.jsonWriter = null;
this.StringWriter = null;
}
}

base.Dispose(disposing);
}

#if NETCOREAPP3_1_OR_GREATER
protected override async ValueTask DisposeAsyncCore()
{
try
{
if (this.messageOutputStream != null)
{
// The IJsonWriter will flush the underlying stream
this.jsonWriter.Flush();

if (this.jsonWriter is IDisposable disposableWriter)
// The IJsonWriterAsync will flush the underlying stream
await this.asynchronousJsonWriter.FlushAsync().ConfigureAwait(false);

if (this.asynchronousJsonWriter is IAsyncDisposable asyncDisposableWriter)
{
disposableWriter.Dispose();
await asyncDisposableWriter.DisposeAsync().ConfigureAwait(false);
}

// In the async case the underlying stream is the async buffered stream, so we have to flush that explicitly.
if (this.asynchronousOutputStream != null)
{
#if NETSTANDARD1_1
this.asynchronousOutputStream.FlushSync();
this.asynchronousOutputStream.Dispose();
#else
this.asynchronousOutputStream.Flush();
await this.asynchronousOutputStream.FlushAsync().ConfigureAwait(false);
// We are working with a BufferedStream here. We flushed it already, so there is nothing else to dispose. And it would dispose the
// inner stream as well.
#endif
}

// Dispose the message stream (note that we OWN this stream, so we always dispose it).
this.messageOutputStream.Dispose();
await this.messageOutputStream.DisposeAsync().ConfigureAwait(false);
}

if (this.BinaryValueStream != null)
{
this.BinaryValueStream.Flush();
this.BinaryValueStream.Dispose();
await this.BinaryValueStream.FlushAsync().ConfigureAwait(false);
await this.BinaryValueStream.DisposeAsync().ConfigureAwait(false);
}

if (this.StringWriter != null)
{
this.StringWriter.Flush();
this.StringWriter.Dispose();
await this.StringWriter.FlushAsync().ConfigureAwait(false);
await this.StringWriter.DisposeAsync().ConfigureAwait(false);
}
}
finally
Expand All @@ -909,9 +965,8 @@ protected override void Dispose(bool disposing)
this.jsonWriter = null;
this.StringWriter = null;
}

base.Dispose(disposing);
}
#endif

/// <summary>
/// Creates a new JSON writer of <see cref="IJsonWriter"/>.
Expand Down
5 changes: 5 additions & 0 deletions src/Microsoft.OData.Core/NonDisposingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public override void Flush()
this.innerStream.Flush();
}

public override Task FlushAsync(CancellationToken cancellationToken)
{
return this.innerStream.FlushAsync(cancellationToken);
}

/// <summary>
/// Reads data from the stream.
/// </summary>
Expand Down
14 changes: 14 additions & 0 deletions src/Microsoft.OData.Core/ODataBinaryStreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,20 @@ protected override void Dispose(bool disposing)
base.Dispose(disposing);
}

#if NETCOREAPP3_1_OR_GREATER
public override async ValueTask DisposeAsync()
{
// write any trailing bytes to stream
if (this.trailingBytes != null && this.trailingBytes.Length > 0)
{
await this.Writer.WriteAsync(Convert.ToBase64String(this.trailingBytes, 0, this.trailingBytes.Length)).ConfigureAwait(false);
this.trailingBytes = null;
}

await this.Writer.FlushAsync().ConfigureAwait(false);
}
#endif

/// <summary>
/// Prepares bytes to be written for the particular write event
/// </summary>
Expand Down
49 changes: 46 additions & 3 deletions src/Microsoft.OData.Core/ODataMessageWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ namespace Microsoft.OData
using Microsoft.OData.Metadata;
#endregion Namespaces

/// <summary>
/// Writer class used to write all OData payloads (entries, resource sets, metadata documents, service documents, etc.).
/// </summary>
/// <summary>
/// Writer class used to write all OData payloads (entries, resource sets, metadata documents, service documents, etc.).
/// </summary>
#if NETCOREAPP3_1_OR_GREATER
public sealed class ODataMessageWriter : IDisposable, IAsyncDisposable
#else
public sealed class ODataMessageWriter : IDisposable
#endif
{
/// <summary>The message for which the message writer was created.</summary>
private readonly ODataMessage message;
Expand Down Expand Up @@ -730,6 +734,23 @@ public void Dispose()
GC.SuppressFinalize(this);
}

#if NETCOREAPP3_1_OR_GREATER
/// <summary>
/// <see cref="System.IAsyncDisposable.DisposeAsync"/> implementation to asynchronously
/// clean up unmanaged resources of the writer.
/// </summary>
/// <returns>A task representing the asynchronous operation of disposing the writer.</returns>
public async ValueTask DisposeAsync()
{
await this.DisposeAsyncCore().ConfigureAwait(false);
// Calling Dispose(disposing: false) releases unmanaged resources if any
// but does not perform synchronous I/O (e.g. does not call Flush())
// See: https://learn.microsoft.com/en-us/dotnet/standard/garbage-collection/implementing-disposeasync#the-disposeasync-method
this.Dispose(false);
GC.SuppressFinalize(this);
}
#endif

/// <summary>
/// Sets the content-type and OData-Version headers on the message used by the message writer.
/// This method can be called if it is important to set all the message headers before calling any of the
Expand Down Expand Up @@ -1193,6 +1214,28 @@ private void Dispose(bool disposing)
}
}

#if NETCOREAPP3_1_OR_GREATER
/// <summary>
/// Asynchronously performs the actual cleanup work.
/// </summary>
/// <returns>A task representing the asynchronous disposal of the writer.</returns>
private async ValueTask DisposeAsyncCore()
{
this.isDisposed = true;
try
{
if (this.outputContext != null)
{
await this.outputContext.DisposeAsync().ConfigureAwait(false);
}
}
finally
{
this.outputContext = null;
}
}
#endif

/// <summary>
/// Verifies that, if a payload kind has been set via SetHeaders, the payload kind that
/// is being written is the same.
Expand Down
53 changes: 45 additions & 8 deletions src/Microsoft.OData.Core/ODataMetadataJsonOutputContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,22 +175,54 @@ internal static Utf8JsonWriter CreateJsonWriter(Stream stream, ODataMessageWrite
/// </summary>
/// <param name="disposing">If 'true' this method is called from user code; if 'false' it is called by the runtime.</param>
protected override void Dispose(bool disposing)
{
if (disposing)
{
try
{
if (this.jsonWriter != null)
{
if (this.asynchronousOutputStream != null)
{
DisposeOutputStreamAsync().Wait();
}
else
{
this.jsonWriter.Flush();
this.jsonWriter.Dispose();
}

this.messageOutputStream.Dispose();
}
}
finally
{
this.messageOutputStream = null;
this.asynchronousOutputStream = null;
this.jsonWriter = null;
}
}

base.Dispose(disposing);
}

#if NETCOREAPP3_1_OR_GREATER
protected override async ValueTask DisposeAsyncCore()
{
try
{
if (this.jsonWriter != null)
{
await this.jsonWriter.FlushAsync().ConfigureAwait(false);
await this.jsonWriter.DisposeAsync().ConfigureAwait(false);

if (this.asynchronousOutputStream != null)
{
DisposeOutputStreamAsync().Wait();
}
else
{
this.jsonWriter.Flush();
this.jsonWriter.Dispose();
await this.asynchronousOutputStream.FlushAsync().ConfigureAwait(false);
await this.asynchronousOutputStream.DisposeAsync().ConfigureAwait(false);
}

this.messageOutputStream.Dispose();
await this.messageOutputStream.DisposeAsync().ConfigureAwait(false);
}
}
finally
Expand All @@ -200,8 +232,9 @@ protected override void Dispose(bool disposing)
this.jsonWriter = null;
}

base.Dispose(disposing);
await base.DisposeAsyncCore().ConfigureAwait(false);
}
#endif

private void WriteMetadataDocumentImplementation()
{
Expand Down Expand Up @@ -232,7 +265,11 @@ private async Task DisposeOutputStreamAsync()

await this.jsonWriter.FlushAsync().ConfigureAwait(false);
await this.jsonWriter.DisposeAsync().ConfigureAwait(false);
#if NETCOREAPP3_1_OR_GREATER
await this.asynchronousOutputStream.DisposeAsync().ConfigureAwait(false);
#else
this.asynchronousOutputStream.Dispose();
#endif
}
}
}
Expand Down
Loading

0 comments on commit 22b8a0a

Please sign in to comment.