Skip to content

Commit

Permalink
Fix synchronous operations disallowed error when using Kestrel web se…
Browse files Browse the repository at this point in the history
…rver (#2459)

Fix synchronous operations disallowed error when using Kestrel web server
  • Loading branch information
gathogojr authored Jul 20, 2022
1 parent 0188486 commit 27851ed
Show file tree
Hide file tree
Showing 8 changed files with 881 additions and 15 deletions.
9 changes: 6 additions & 3 deletions src/Microsoft.OData.Core/Batch/ODataBatchUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ internal static Uri CreateOperationRequestUri(Uri uri, Uri baseUri, IODataPayloa
/// <param name="batchReaderStream">The batch stream to create the operation read stream for.</param>
/// <param name="headers">The headers of the current part; based on the header we create different, optimized stream implementations.</param>
/// <param name="operationListener">The operation listener to be passed to the newly created read stream.</param>
/// <param name="synchronous">true if the stream is to be created for synchronous operation; false for asynchronous.</param>
/// <returns>A new <see cref="ODataReadStream"/> instance.</returns>
internal static ODataReadStream CreateBatchOperationReadStream(
ODataBatchReaderStream batchReaderStream,
ODataBatchOperationHeaders headers,
IODataStreamListener operationListener)
IODataStreamListener operationListener,
bool synchronous = true)
{
Debug.Assert(batchReaderStream != null, "batchReaderStream != null");
Debug.Assert(operationListener != null, "operationListener != null");
Expand All @@ -96,17 +98,18 @@ internal static ODataReadStream CreateBatchOperationReadStream(
throw new ODataException(Strings.ODataBatchReaderStream_InvalidContentLengthSpecified(contentLengthValue));
}

return ODataReadStream.Create(batchReaderStream, operationListener, length);
return ODataReadStream.Create(batchReaderStream, operationListener, length, synchronous);
}

return ODataReadStream.Create(batchReaderStream, operationListener);
return ODataReadStream.Create(batchReaderStream, operationListener, synchronous);
}

/// <summary>
/// Creates a batch operation write stream over the specified output stream.
/// </summary>
/// <param name="outputStream">The output stream to create the operation write stream over.</param>
/// <param name="operationListener">The operation listener to be passed to the newly created write stream.</param>
/// <param name="synchronous">true if the stream is to be created for synchronous operation; false for asynchronous.</param>
/// <returns>A new <see cref="ODataWriteStream"/> instance.</returns>
internal static ODataWriteStream CreateBatchOperationWriteStream(
Stream outputStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected override ODataBatchOperationRequestMessage CreateOperationRequestMessa
}

ODataBatchOperationRequestMessage requestMessage = BuildOperationRequestMessage(
() => ODataBatchUtils.CreateBatchOperationReadStream(this.batchStream, headers, this),
() => ODataBatchUtils.CreateBatchOperationReadStream(this.batchStream, headers, this, this.InputContext.Synchronous),
httpMethod,
requestUri,
headers,
Expand Down Expand Up @@ -139,7 +139,7 @@ protected override ODataBatchOperationResponseMessage CreateOperationResponseMes
// We don't have correlation of changeset boundary between request and response messages in
// changesets, so use null value for groupId.
ODataBatchOperationResponseMessage responseMessage = BuildOperationResponseMessage(
() => ODataBatchUtils.CreateBatchOperationReadStream(this.batchStream, headers, this),
() => ODataBatchUtils.CreateBatchOperationReadStream(this.batchStream, headers, this, this.InputContext.Synchronous),
statusCode,
headers,
this.currentContentId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,12 @@ await this.StartBatchOperationContentAsync()
// then dispose the batch writer (since we are now writing the operation content) and set the corresponding state.
await this.RawOutputContext.FlushBuffersAsync()
.ConfigureAwait(false);
#if NETCOREAPP3_1_OR_GREATER
await this.DisposeBatchWriterAndSetContentStreamRequestedStateAsync()
.ConfigureAwait(false);
#else
this.DisposeBatchWriterAndSetContentStreamRequestedState();
#endif
}

/// <summary>
Expand Down Expand Up @@ -752,5 +757,19 @@ await this.RawOutputContext.TextWriter.WriteLineAsync()
}
}
}

#if NETCOREAPP3_1_OR_GREATER
/// <summary>
/// Asynchronously disposes the batch writer and set the 'OperationStreamRequested' batch writer state;
/// called after the flush operation(s) have completed.
/// </summary>
/// <returns>A task that represents the asynchronous operation.</returns>
private async Task DisposeBatchWriterAndSetContentStreamRequestedStateAsync()
{
await this.RawOutputContext.CloseWriterAsync().ConfigureAwait(false);

this.SetState(BatchWriterState.OperationStreamRequested);
}
#endif
}
}
14 changes: 14 additions & 0 deletions src/Microsoft.OData.Core/ODataRawOutputContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,20 @@ internal Task FlushBuffersAsync()
}
}

#if NETCOREAPP3_1_OR_GREATER
/// <summary>
/// Closes the text writer asynchronously.
/// </summary>
/// <returns>A task that represents the asynchronous operation.</returns>
internal async Task CloseWriterAsync()
{
Debug.Assert(this.rawValueWriter != null, "The text writer has not been initialized yet.");

await this.rawValueWriter.DisposeAsync().ConfigureAwait(false);
this.rawValueWriter = null;
}
#endif

/// <summary>
/// Perform the actual cleanup work.
/// </summary>
Expand Down
37 changes: 27 additions & 10 deletions src/Microsoft.OData.Core/ODataReadStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ internal abstract class ODataReadStream : ODataStream
/// </summary>
/// <param name="batchReaderStream">The underlying stream to read from.</param>
/// <param name="listener">Listener interface to be notified of operation changes.</param>
private ODataReadStream(ODataBatchReaderStream batchReaderStream, IODataStreamListener listener)
: base(listener)
/// <param name="synchronous">true if the stream is created for synchronous operation; false for asynchronous.</param>
private ODataReadStream(ODataBatchReaderStream batchReaderStream, IODataStreamListener listener, bool synchronous)
: base(listener, synchronous)
{
Debug.Assert(batchReaderStream != null, "batchReaderStream != null");
this.batchReaderStream = batchReaderStream;
Expand Down Expand Up @@ -111,21 +112,30 @@ public override void Write(byte[] buffer, int offset, int count)
/// <param name="batchReaderStream">The batch stream underlying the operation stream to create.</param>
/// <param name="listener">The batch operation listener.</param>
/// <param name="length">The content length of the operation stream.</param>
/// <param name="synchronous">true if the stream is created for synchronous operation; false for asynchronous.</param>
/// <returns>A <see cref="ODataReadStream"/> to read the content of a batch operation from.</returns>
internal static ODataReadStream Create(ODataBatchReaderStream batchReaderStream, IODataStreamListener listener, int length)
internal static ODataReadStream Create(
ODataBatchReaderStream batchReaderStream,
IODataStreamListener listener,
int length,
bool synchronous = true)
{
return new ODataBatchOperationReadStreamWithLength(batchReaderStream, listener, length);
return new ODataBatchOperationReadStreamWithLength(batchReaderStream, listener, length, synchronous);
}

/// <summary>
/// Create a batch operation read stream over the specified batch stream using the batch delimiter to detect the end of the stream.
/// </summary>
/// <param name="batchReaderStream">The batch stream underlying the operation stream to create.</param>
/// <param name="listener">The batch operation listener.</param>
/// <param name="synchronous">true if the stream is created for synchronous operation; false for asynchronous.</param>
/// <returns>A <see cref="ODataReadStream"/> to read the content of a batch operation from.</returns>
internal static ODataReadStream Create(ODataBatchReaderStream batchReaderStream, IODataStreamListener listener)
internal static ODataReadStream Create(
ODataBatchReaderStream batchReaderStream,
IODataStreamListener listener,
bool synchronous = true)
{
return new ODataBatchOperationReadStreamWithDelimiter(batchReaderStream, listener);
return new ODataBatchOperationReadStreamWithDelimiter(batchReaderStream, listener, synchronous);
}

/// <summary>
Expand All @@ -142,8 +152,12 @@ private sealed class ODataBatchOperationReadStreamWithLength : ODataReadStream
/// <param name="batchReaderStream">The underlying batch stream to write the message to.</param>
/// <param name="listener">Listener interface to be notified of operation changes.</param>
/// <param name="length">The total length of the stream.</param>
internal ODataBatchOperationReadStreamWithLength(ODataBatchReaderStream batchReaderStream, IODataStreamListener listener, int length)
: base(batchReaderStream, listener)
internal ODataBatchOperationReadStreamWithLength(
ODataBatchReaderStream batchReaderStream,
IODataStreamListener listener,
int length,
bool synchronous)
: base(batchReaderStream, listener, synchronous)
{
ExceptionUtils.CheckIntegerNotNegative(length, "length");
this.length = length;
Expand Down Expand Up @@ -189,8 +203,11 @@ private sealed class ODataBatchOperationReadStreamWithDelimiter : ODataReadStrea
/// </summary>
/// <param name="batchReaderStream">The underlying batch stream to write the message to.</param>
/// <param name="listener">Listener interface to be notified of operation changes.</param>
internal ODataBatchOperationReadStreamWithDelimiter(ODataBatchReaderStream batchReaderStream, IODataStreamListener listener)
: base(batchReaderStream, listener)
internal ODataBatchOperationReadStreamWithDelimiter(
ODataBatchReaderStream batchReaderStream,
IODataStreamListener listener,
bool synchronous)
: base(batchReaderStream, listener, synchronous)
{
}

Expand Down
1 change: 1 addition & 0 deletions src/Microsoft.OData.Core/ODataWriteStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ internal sealed class ODataWriteStream : ODataStream
/// </summary>
/// <param name="stream">The underlying stream to write the message to.</param>
/// <param name="listener">Listener interface to be notified of operation changes.</param>
/// <param name="synchronous">true if the stream is created for synchronous operation; false for asynchronous.</param>
internal ODataWriteStream(Stream stream, IODataStreamListener listener, bool synchronous = true)
: base(listener, synchronous)
{
Expand Down
28 changes: 28 additions & 0 deletions src/Microsoft.OData.Core/RawValueWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ namespace Microsoft.OData
/// <summary>
/// Class that handles writing top level raw values to a stream.
/// </summary>
#if NETCOREAPP3_1_OR_GREATER
internal sealed class RawValueWriter : IDisposable, IAsyncDisposable
#else
internal sealed class RawValueWriter : IDisposable
#endif
{
/// <summary>
/// Writer settings.
Expand Down Expand Up @@ -86,6 +90,30 @@ public void Dispose()
this.textWriter = null;
}

#if NETCOREAPP3_1_OR_GREATER
/// <summary>
/// Asynchronously disposes the <see cref="RawValueWriter"/>.
/// It flushes itself and then disposes its inner <see cref="System.IO.TextWriter"/>.
/// </summary>
/// <returns>A task that represents the asynchronous dispose operation.</returns>
public ValueTask DisposeAsync()
{
return DisposeInnerAsync();

async ValueTask DisposeInnerAsync()
{
Debug.Assert(this.textWriter != null, "The text writer has not been initialized yet.");

if (this.textWriter != null)
{
await this.textWriter.DisposeAsync().ConfigureAwait(false);
}

this.textWriter = null;
}
}
#endif

/// <summary>
/// Start writing a raw output. This should only be called once.
/// </summary>
Expand Down
Loading

0 comments on commit 27851ed

Please sign in to comment.