Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support case insensitive cloudevent payloads and forward cloudevent props s headers #1153

Merged
merged 12 commits into from
Oct 17, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// limitations under the License.
// ------------------------------------------------------------------------

using System.Linq;

namespace ControllerSample.Controllers
{
using System;
Expand Down Expand Up @@ -43,6 +45,7 @@ public SampleController(ILogger<SampleController> logger)
/// State store name.
/// </summary>
public const string StoreName = "statestore";

private readonly ILogger<SampleController> logger;

/// <summary>
Expand Down Expand Up @@ -72,6 +75,11 @@ public ActionResult<Account> Get([FromState(StoreName)] StateEntry<Account> acco
[HttpPost("deposit")]
public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromServices] DaprClient daprClient)
{
// Example reading cloudevent properties from the headers
var headerEntries = Request.Headers.Aggregate("", (current, header) => current + ($"------- Header: {header.Key} : {header.Value}" + Environment.NewLine));

logger.LogInformation(headerEntries);

logger.LogInformation("Enter deposit");
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
Expand All @@ -83,7 +91,7 @@ public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromS
}

state.Value.Balance += transaction.Amount;
logger.LogInformation("Balance for Id {0} is {1}",state.Value.Id, state.Value.Balance);
logger.LogInformation("Balance for Id {0} is {1}", state.Value.Id, state.Value.Balance);
await state.SaveAsync();
return state.Value;
}
Expand All @@ -98,22 +106,23 @@ public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromS
[Topic("pubsub", "multideposit", "amountDeadLetterTopic", false)]
[BulkSubscribe("multideposit", 500, 2000)]
[HttpPost("multideposit")]
public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody] BulkSubscribeMessage<BulkMessageModel<Transaction>>
bulkMessage, [FromServices] DaprClient daprClient)
public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody]
BulkSubscribeMessage<BulkMessageModel<Transaction>>
bulkMessage, [FromServices] DaprClient daprClient)
{
logger.LogInformation("Enter bulk deposit");

List<BulkSubscribeAppResponseEntry> entries = new List<BulkSubscribeAppResponseEntry>();

foreach (var entry in bulkMessage.Entries)
{
{
try
{
var transaction = entry.Event.Data;

var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
logger.LogInformation("Id is {0}, the amount to be deposited is {1}",
logger.LogInformation("Id is {0}, the amount to be deposited is {1}",
transaction.Id, transaction.Amount);

if (transaction.Amount < 0m)
Expand All @@ -124,12 +133,16 @@ public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody
state.Value.Balance += transaction.Amount;
logger.LogInformation("Balance is {0}", state.Value.Balance);
await state.SaveAsync();
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
entries.Add(
new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
}
catch (Exception e)
{
logger.LogError(e.Message);
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.RETRY));
}
}

return new BulkSubscribeAppResponse(entries);
}

Expand Down Expand Up @@ -165,6 +178,7 @@ public async Task<ActionResult<Account>> Withdraw(Transaction transaction, [From
{
return this.NotFound();
}

if (transaction.Amount < 0m)
{
return BadRequest(new { statusCode = 400, message = "bad request" });
Expand All @@ -185,7 +199,8 @@ public async Task<ActionResult<Account>> Withdraw(Transaction transaction, [From
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
[Topic("pubsub", "withdraw", "event.type ==\"withdraw.v2\"", 1)]
[HttpPost("withdraw.v2")]
public async Task<ActionResult<Account>> WithdrawV2(TransactionV2 transaction, [FromServices] DaprClient daprClient)
public async Task<ActionResult<Account>> WithdrawV2(TransactionV2 transaction,
[FromServices] DaprClient daprClient)
{
logger.LogInformation("Enter withdraw.v2");
if (transaction.Channel == "mobile" && transaction.Amount > 10000)
Expand Down Expand Up @@ -214,12 +229,15 @@ public async Task<ActionResult<Account>> WithdrawV2(TransactionV2 transaction, [
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
[Topic("pubsub", "rawDeposit", true)]
[HttpPost("rawDeposit")]
public async Task<ActionResult<Account>> RawDeposit([FromBody] JsonDocument rawTransaction, [FromServices] DaprClient daprClient)
public async Task<ActionResult<Account>> RawDeposit([FromBody] JsonDocument rawTransaction,
[FromServices] DaprClient daprClient)
{
var transactionString = rawTransaction.RootElement.GetProperty("data_base64").GetString();
logger.LogInformation($"Enter deposit: {transactionString} - {Encoding.UTF8.GetString(Convert.FromBase64String(transactionString))}");
logger.LogInformation(
$"Enter deposit: {transactionString} - {Encoding.UTF8.GetString(Convert.FromBase64String(transactionString))}");
var transactionJson = JsonSerializer.Deserialize<JsonDocument>(Convert.FromBase64String(transactionString));
var transaction = JsonSerializer.Deserialize<Transaction>(transactionJson.RootElement.GetProperty("data").GetRawText());
var transaction =
JsonSerializer.Deserialize<Transaction>(transactionJson.RootElement.GetProperty("data").GetRawText());
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
logger.LogInformation("Id is {0}, the amount to be deposited is {1}", transaction.Id, transaction.Amount);
Expand All @@ -239,7 +257,8 @@ public async Task<ActionResult<Account>> RawDeposit([FromBody] JsonDocument rawT
/// Method for returning a BadRequest result which will cause Dapr sidecar to throw an RpcException
/// </summary>
[HttpPost("throwException")]
public async Task<ActionResult<Account>> ThrowException(Transaction transaction, [FromServices] DaprClient daprClient)
public async Task<ActionResult<Account>> ThrowException(Transaction transaction,
[FromServices] DaprClient daprClient)
{
logger.LogInformation("Enter ThrowException");
var task = Task.Delay(10);
Expand Down
8 changes: 7 additions & 1 deletion examples/AspNetCore/ControllerSample/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
// limitations under the License.
// ------------------------------------------------------------------------


using Dapr;
using Dapr.AspNetCore;


namespace ControllerSample
{
using Microsoft.AspNetCore.Builder;
Expand Down Expand Up @@ -63,7 +66,10 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env)

app.UseRouting();

app.UseCloudEvents();
app.UseCloudEvents(new CloudEventsMiddlewareOptions
{
ForwardCloudEventPropertiesAsHeaders = true
});

app.UseAuthorization();

Expand Down
9 changes: 9 additions & 0 deletions src/Dapr.AspNetCore/CloudEventPropertyNames.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Dapr
{
internal static class CloudEventPropertyNames
{
public const string Data = "data";
public const string DataContentType = "datacontenttype";
public const string DataBase64 = "data_base64";
}
}
116 changes: 99 additions & 17 deletions src/Dapr.AspNetCore/CloudEventsMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
// limitations under the License.
// ------------------------------------------------------------------------

using System.Collections.Generic;
using System.Linq;

namespace Dapr
{
using System;
Expand All @@ -27,6 +30,15 @@ namespace Dapr
internal class CloudEventsMiddleware
{
private const string ContentType = "application/cloudevents+json";

// These cloudevent properties are either containing the body of the message or
// are included in the headers by other components of Dapr earlier in the pipeline
private static readonly string[] ExcludedPropertiesFromHeaders =
IliasP91 marked this conversation as resolved.
Show resolved Hide resolved
{
CloudEventPropertyNames.DataContentType, CloudEventPropertyNames.Data,
CloudEventPropertyNames.DataBase64, "pubsubname", "traceparent"
};

private readonly RequestDelegate next;
private readonly CloudEventsMiddlewareOptions options;

Expand All @@ -52,7 +64,7 @@ public Task InvokeAsync(HttpContext httpContext)
// The philosophy here is that we don't report an error for things we don't support, because
// that would block someone from implementing their own support for it. We only report an error
// when something we do support isn't correct.
if (!this.MatchesContentType(httpContext, out var charSet))
if (!MatchesContentType(httpContext, out var charSet))
IliasP91 marked this conversation as resolved.
Show resolved Hide resolved
{
return this.next(httpContext);
}
Expand All @@ -69,7 +81,8 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
}
else
{
using (var reader = new HttpRequestStreamReader(httpContext.Request.Body, Encoding.GetEncoding(charSet)))
using (var reader =
new HttpRequestStreamReader(httpContext.Request.Body, Encoding.GetEncoding(charSet)))
{
var text = await reader.ReadToEndAsync();
json = JsonSerializer.Deserialize<JsonElement>(text);
Expand All @@ -83,17 +96,43 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
string contentType;

// Check whether to use data or data_base64 as per https://github.com/cloudevents/spec/blob/v1.0.1/json-format.md#31-handling-of-data
var isDataSet = json.TryGetProperty("data", out var data);
var isBinaryDataSet = json.TryGetProperty("data_base64", out var binaryData);
// Get the property names by OrdinalIgnoreCase comparison to support case insensitive JSON as the Json Serializer for AspCore already supports it by default.
var jsonPropNames = json.EnumerateObject().ToArray();

var dataPropName = jsonPropNames
.Select(d => d.Name)
.FirstOrDefault(d => d.Equals(CloudEventPropertyNames.Data, StringComparison.OrdinalIgnoreCase));

var dataBase64PropName = jsonPropNames
.Select(d => d.Name)
.FirstOrDefault(d =>
d.Equals(CloudEventPropertyNames.DataBase64, StringComparison.OrdinalIgnoreCase));

var isDataSet = false;
var isBinaryDataSet = false;
JsonElement data = default;

if (dataPropName != null)
{
isDataSet = true;
data = json.TryGetProperty(dataPropName, out var dataJsonElement) ? dataJsonElement : data;
}

if (dataBase64PropName != null)
{
isBinaryDataSet = true;
data = json.TryGetProperty(dataBase64PropName, out var dataJsonElement) ? dataJsonElement : data;
}

if (isDataSet && isBinaryDataSet)
{
httpContext.Response.StatusCode = (int)HttpStatusCode.BadRequest;
return;
}
else if (isDataSet)

if (isDataSet)
{
contentType = this.GetDataContentType(json, out var isJson);
contentType = GetDataContentType(json, out var isJson);

// If the value is anything other than a JSON string, treat it as JSON. Cloud Events requires
// non-JSON text to be enclosed in a JSON string.
Expand All @@ -109,8 +148,8 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
{
// Rehydrate body from contents of the string
var text = data.GetString();
using var writer = new HttpResponseStreamWriter(body, Encoding.UTF8);
writer.Write(text);
await using var writer = new HttpResponseStreamWriter(body, Encoding.UTF8);
await writer.WriteAsync(text);
}

body.Seek(0L, SeekOrigin.Begin);
Expand All @@ -120,17 +159,19 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
// As per the spec, if the implementation determines that the type of data is Binary,
// the value MUST be represented as a JSON string expression containing the Base64 encoded
// binary value, and use the member name data_base64 to store it inside the JSON object.
var decodedBody = binaryData.GetBytesFromBase64();
var decodedBody = data.GetBytesFromBase64();
body = new MemoryStream(decodedBody);
body.Seek(0L, SeekOrigin.Begin);
contentType = this.GetDataContentType(json, out _);
contentType = GetDataContentType(json, out _);
}
else
{
body = new MemoryStream();
contentType = null;
}

ForwardCloudEventPropertiesAsHeaders(httpContext, jsonPropNames);

originalBody = httpContext.Request.Body;
originalContentType = httpContext.Request.ContentType;

Expand All @@ -148,16 +189,57 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
}
}

private string GetDataContentType(JsonElement json, out bool isJson)
private void ForwardCloudEventPropertiesAsHeaders(
HttpContext httpContext,
IEnumerable<JsonProperty> jsonPropNames)
{
if (!options.ForwardCloudEventPropertiesAsHeaders)
{
return;
}

var filteredPropertyNames = jsonPropNames
.Where(d => !ExcludedPropertiesFromHeaders.Contains(d.Name, StringComparer.OrdinalIgnoreCase));

if (options.IncludedCloudEventPropertiesAsHeaders != null)
{
filteredPropertyNames = filteredPropertyNames
.Where(d => options.IncludedCloudEventPropertiesAsHeaders
.Contains(d.Name, StringComparer.OrdinalIgnoreCase));
}
else if (options.ExcludedCloudEventPropertiesFromHeaders != null)
{
filteredPropertyNames = filteredPropertyNames
.Where(d => !options.ExcludedCloudEventPropertiesFromHeaders
.Contains(d.Name, StringComparer.OrdinalIgnoreCase));
}

foreach (var jsonProperty in filteredPropertyNames)
{
httpContext.Request.Headers.TryAdd($"Cloudevent.{jsonProperty.Name.ToLowerInvariant()}",
IliasP91 marked this conversation as resolved.
Show resolved Hide resolved
jsonProperty.Value.GetRawText().Trim('\"'));
}
}

private static string GetDataContentType(JsonElement json, out bool isJson)
{
var dataContentTypePropName = json
.EnumerateObject()
.Select(d => d.Name)
.FirstOrDefault(d =>
d.Equals(CloudEventPropertyNames.DataContentType,
StringComparison.OrdinalIgnoreCase));

string contentType;
if (json.TryGetProperty("datacontenttype", out var dataContentType) &&
dataContentType.ValueKind == JsonValueKind.String &&
MediaTypeHeaderValue.TryParse(dataContentType.GetString(), out var parsed))

if (dataContentTypePropName != null
&& json.TryGetProperty(dataContentTypePropName, out var dataContentType)
&& dataContentType.ValueKind == JsonValueKind.String
&& MediaTypeHeaderValue.TryParse(dataContentType.GetString(), out var parsed))
{
contentType = dataContentType.GetString();
isJson =
parsed.MediaType.Equals( "application/json", StringComparison.Ordinal) ||
isJson =
parsed.MediaType.Equals("application/json", StringComparison.Ordinal) ||
parsed.Suffix.EndsWith("+json", StringComparison.Ordinal);

// Since S.T.Json always outputs utf-8, we may need to normalize the data content type
Expand All @@ -179,7 +261,7 @@ private string GetDataContentType(JsonElement json, out bool isJson)
return contentType;
}

private bool MatchesContentType(HttpContext httpContext, out string charSet)
private static bool MatchesContentType(HttpContext httpContext, out string charSet)
{
if (httpContext.Request.ContentType == null)
{
Expand Down
Loading
Loading