Skip to content

Commit

Permalink
Support case insensitive cloudevent payloads and forward cloudevent p…
Browse files Browse the repository at this point in the history
…rops s headers (dapr#1153)

* forward cloudevent props
Signed-off-by: Ilias Politsopoulos <[email protected]>

* refactor middleware
Signed-off-by: Ilias Politsopoulos <[email protected]>

* add cloud event property filters
Signed-off-by: Ilias Politsopoulos <[email protected]>

* update string check
Signed-off-by: Ilias Politsopoulos <[email protected]>

* forward cloudevent props
Signed-off-by: Ilias Politsopoulos <[email protected]>

* refactor middleware
Signed-off-by: Ilias Politsopoulos <[email protected]>

* add cloud event property filters
Signed-off-by: Ilias Politsopoulos <[email protected]>

* update checks
Signed-off-by: Ilias Politsopoulos <[email protected]>

---------

Signed-off-by: Whit Waldo <[email protected]>
Co-authored-by: Whit Waldo <[email protected]>
  • Loading branch information
2 people authored and humandigital-michiel committed Oct 23, 2024
1 parent d30909e commit c814042
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// limitations under the License.
// ------------------------------------------------------------------------

using System.Linq;

namespace ControllerSample.Controllers
{
using System;
Expand Down Expand Up @@ -43,6 +45,7 @@ public SampleController(ILogger<SampleController> logger)
/// State store name.
/// </summary>
public const string StoreName = "statestore";

private readonly ILogger<SampleController> logger;

/// <summary>
Expand Down Expand Up @@ -72,6 +75,11 @@ public ActionResult<Account> Get([FromState(StoreName)] StateEntry<Account> acco
[HttpPost("deposit")]
public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromServices] DaprClient daprClient)
{
// Example reading cloudevent properties from the headers
var headerEntries = Request.Headers.Aggregate("", (current, header) => current + ($"------- Header: {header.Key} : {header.Value}" + Environment.NewLine));

logger.LogInformation(headerEntries);

logger.LogInformation("Enter deposit");
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
Expand All @@ -83,7 +91,7 @@ public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromS
}

state.Value.Balance += transaction.Amount;
logger.LogInformation("Balance for Id {0} is {1}",state.Value.Id, state.Value.Balance);
logger.LogInformation("Balance for Id {0} is {1}", state.Value.Id, state.Value.Balance);
await state.SaveAsync();
return state.Value;
}
Expand All @@ -98,22 +106,23 @@ public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromS
[Topic("pubsub", "multideposit", "amountDeadLetterTopic", false)]
[BulkSubscribe("multideposit", 500, 2000)]
[HttpPost("multideposit")]
public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody] BulkSubscribeMessage<BulkMessageModel<Transaction>>
bulkMessage, [FromServices] DaprClient daprClient)
public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody]
BulkSubscribeMessage<BulkMessageModel<Transaction>>
bulkMessage, [FromServices] DaprClient daprClient)
{
logger.LogInformation("Enter bulk deposit");

List<BulkSubscribeAppResponseEntry> entries = new List<BulkSubscribeAppResponseEntry>();

foreach (var entry in bulkMessage.Entries)
{
{
try
{
var transaction = entry.Event.Data;

var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
logger.LogInformation("Id is {0}, the amount to be deposited is {1}",
logger.LogInformation("Id is {0}, the amount to be deposited is {1}",
transaction.Id, transaction.Amount);

if (transaction.Amount < 0m)
Expand All @@ -124,12 +133,16 @@ public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody
state.Value.Balance += transaction.Amount;
logger.LogInformation("Balance is {0}", state.Value.Balance);
await state.SaveAsync();
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
entries.Add(
new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
}
catch (Exception e)
{
logger.LogError(e.Message);
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.RETRY));
}
}

return new BulkSubscribeAppResponse(entries);
}

Expand Down Expand Up @@ -165,6 +178,7 @@ public async Task<ActionResult<Account>> Withdraw(Transaction transaction, [From
{
return this.NotFound();
}

if (transaction.Amount < 0m)
{
return BadRequest(new { statusCode = 400, message = "bad request" });
Expand All @@ -185,7 +199,8 @@ public async Task<ActionResult<Account>> Withdraw(Transaction transaction, [From
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
[Topic("pubsub", "withdraw", "event.type ==\"withdraw.v2\"", 1)]
[HttpPost("withdraw.v2")]
public async Task<ActionResult<Account>> WithdrawV2(TransactionV2 transaction, [FromServices] DaprClient daprClient)
public async Task<ActionResult<Account>> WithdrawV2(TransactionV2 transaction,
[FromServices] DaprClient daprClient)
{
logger.LogInformation("Enter withdraw.v2");
if (transaction.Channel == "mobile" && transaction.Amount > 10000)
Expand Down Expand Up @@ -214,12 +229,15 @@ public async Task<ActionResult<Account>> WithdrawV2(TransactionV2 transaction, [
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
[Topic("pubsub", "rawDeposit", true)]
[HttpPost("rawDeposit")]
public async Task<ActionResult<Account>> RawDeposit([FromBody] JsonDocument rawTransaction, [FromServices] DaprClient daprClient)
public async Task<ActionResult<Account>> RawDeposit([FromBody] JsonDocument rawTransaction,
[FromServices] DaprClient daprClient)
{
var transactionString = rawTransaction.RootElement.GetProperty("data_base64").GetString();
logger.LogInformation($"Enter deposit: {transactionString} - {Encoding.UTF8.GetString(Convert.FromBase64String(transactionString))}");
logger.LogInformation(
$"Enter deposit: {transactionString} - {Encoding.UTF8.GetString(Convert.FromBase64String(transactionString))}");
var transactionJson = JsonSerializer.Deserialize<JsonDocument>(Convert.FromBase64String(transactionString));
var transaction = JsonSerializer.Deserialize<Transaction>(transactionJson.RootElement.GetProperty("data").GetRawText());
var transaction =
JsonSerializer.Deserialize<Transaction>(transactionJson.RootElement.GetProperty("data").GetRawText());
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
logger.LogInformation("Id is {0}, the amount to be deposited is {1}", transaction.Id, transaction.Amount);
Expand All @@ -239,7 +257,8 @@ public async Task<ActionResult<Account>> RawDeposit([FromBody] JsonDocument rawT
/// Method for returning a BadRequest result which will cause Dapr sidecar to throw an RpcException
/// </summary>
[HttpPost("throwException")]
public async Task<ActionResult<Account>> ThrowException(Transaction transaction, [FromServices] DaprClient daprClient)
public async Task<ActionResult<Account>> ThrowException(Transaction transaction,
[FromServices] DaprClient daprClient)
{
logger.LogInformation("Enter ThrowException");
var task = Task.Delay(10);
Expand Down
8 changes: 7 additions & 1 deletion examples/AspNetCore/ControllerSample/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
// limitations under the License.
// ------------------------------------------------------------------------


using Dapr;
using Dapr.AspNetCore;


namespace ControllerSample
{
using Microsoft.AspNetCore.Builder;
Expand Down Expand Up @@ -63,7 +66,10 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env)

app.UseRouting();

app.UseCloudEvents();
app.UseCloudEvents(new CloudEventsMiddlewareOptions
{
ForwardCloudEventPropertiesAsHeaders = true
});

app.UseAuthorization();

Expand Down
9 changes: 9 additions & 0 deletions src/Dapr.AspNetCore/CloudEventPropertyNames.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Dapr
{
internal static class CloudEventPropertyNames
{
public const string Data = "data";
public const string DataContentType = "datacontenttype";
public const string DataBase64 = "data_base64";
}
}
116 changes: 99 additions & 17 deletions src/Dapr.AspNetCore/CloudEventsMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
// limitations under the License.
// ------------------------------------------------------------------------

using System.Collections.Generic;
using System.Linq;

namespace Dapr
{
using System;
Expand All @@ -27,6 +30,15 @@ namespace Dapr
internal class CloudEventsMiddleware
{
private const string ContentType = "application/cloudevents+json";

// These cloudevent properties are either containing the body of the message or
// are included in the headers by other components of Dapr earlier in the pipeline
private static readonly string[] ExcludedPropertiesFromHeaders =
{
CloudEventPropertyNames.DataContentType, CloudEventPropertyNames.Data,
CloudEventPropertyNames.DataBase64, "pubsubname", "traceparent"
};

private readonly RequestDelegate next;
private readonly CloudEventsMiddlewareOptions options;

Expand All @@ -52,7 +64,7 @@ public Task InvokeAsync(HttpContext httpContext)
// The philosophy here is that we don't report an error for things we don't support, because
// that would block someone from implementing their own support for it. We only report an error
// when something we do support isn't correct.
if (!this.MatchesContentType(httpContext, out var charSet))
if (!MatchesContentType(httpContext, out var charSet))
{
return this.next(httpContext);
}
Expand All @@ -69,7 +81,8 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
}
else
{
using (var reader = new HttpRequestStreamReader(httpContext.Request.Body, Encoding.GetEncoding(charSet)))
using (var reader =
new HttpRequestStreamReader(httpContext.Request.Body, Encoding.GetEncoding(charSet)))
{
var text = await reader.ReadToEndAsync();
json = JsonSerializer.Deserialize<JsonElement>(text);
Expand All @@ -83,17 +96,43 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
string contentType;

// Check whether to use data or data_base64 as per https://github.com/cloudevents/spec/blob/v1.0.1/json-format.md#31-handling-of-data
var isDataSet = json.TryGetProperty("data", out var data);
var isBinaryDataSet = json.TryGetProperty("data_base64", out var binaryData);
// Get the property names by OrdinalIgnoreCase comparison to support case insensitive JSON as the Json Serializer for AspCore already supports it by default.
var jsonPropNames = json.EnumerateObject().ToArray();

var dataPropName = jsonPropNames
.Select(d => d.Name)
.FirstOrDefault(d => d.Equals(CloudEventPropertyNames.Data, StringComparison.OrdinalIgnoreCase));

var dataBase64PropName = jsonPropNames
.Select(d => d.Name)
.FirstOrDefault(d =>
d.Equals(CloudEventPropertyNames.DataBase64, StringComparison.OrdinalIgnoreCase));

var isDataSet = false;
var isBinaryDataSet = false;
JsonElement data = default;

if (dataPropName != null)
{
isDataSet = true;
data = json.TryGetProperty(dataPropName, out var dataJsonElement) ? dataJsonElement : data;
}

if (dataBase64PropName != null)
{
isBinaryDataSet = true;
data = json.TryGetProperty(dataBase64PropName, out var dataJsonElement) ? dataJsonElement : data;
}

if (isDataSet && isBinaryDataSet)
{
httpContext.Response.StatusCode = (int)HttpStatusCode.BadRequest;
return;
}
else if (isDataSet)

if (isDataSet)
{
contentType = this.GetDataContentType(json, out var isJson);
contentType = GetDataContentType(json, out var isJson);

// If the value is anything other than a JSON string, treat it as JSON. Cloud Events requires
// non-JSON text to be enclosed in a JSON string.
Expand All @@ -109,8 +148,8 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
{
// Rehydrate body from contents of the string
var text = data.GetString();
using var writer = new HttpResponseStreamWriter(body, Encoding.UTF8);
writer.Write(text);
await using var writer = new HttpResponseStreamWriter(body, Encoding.UTF8);
await writer.WriteAsync(text);
}

body.Seek(0L, SeekOrigin.Begin);
Expand All @@ -120,17 +159,19 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
// As per the spec, if the implementation determines that the type of data is Binary,
// the value MUST be represented as a JSON string expression containing the Base64 encoded
// binary value, and use the member name data_base64 to store it inside the JSON object.
var decodedBody = binaryData.GetBytesFromBase64();
var decodedBody = data.GetBytesFromBase64();
body = new MemoryStream(decodedBody);
body.Seek(0L, SeekOrigin.Begin);
contentType = this.GetDataContentType(json, out _);
contentType = GetDataContentType(json, out _);
}
else
{
body = new MemoryStream();
contentType = null;
}

ForwardCloudEventPropertiesAsHeaders(httpContext, jsonPropNames);

originalBody = httpContext.Request.Body;
originalContentType = httpContext.Request.ContentType;

Expand All @@ -148,16 +189,57 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
}
}

private string GetDataContentType(JsonElement json, out bool isJson)
private void ForwardCloudEventPropertiesAsHeaders(
HttpContext httpContext,
IEnumerable<JsonProperty> jsonPropNames)
{
if (!options.ForwardCloudEventPropertiesAsHeaders)
{
return;
}

var filteredPropertyNames = jsonPropNames
.Where(d => !ExcludedPropertiesFromHeaders.Contains(d.Name, StringComparer.OrdinalIgnoreCase));

if (options.IncludedCloudEventPropertiesAsHeaders != null)
{
filteredPropertyNames = filteredPropertyNames
.Where(d => options.IncludedCloudEventPropertiesAsHeaders
.Contains(d.Name, StringComparer.OrdinalIgnoreCase));
}
else if (options.ExcludedCloudEventPropertiesFromHeaders != null)
{
filteredPropertyNames = filteredPropertyNames
.Where(d => !options.ExcludedCloudEventPropertiesFromHeaders
.Contains(d.Name, StringComparer.OrdinalIgnoreCase));
}

foreach (var jsonProperty in filteredPropertyNames)
{
httpContext.Request.Headers.TryAdd($"Cloudevent.{jsonProperty.Name.ToLowerInvariant()}",
jsonProperty.Value.GetRawText().Trim('\"'));
}
}

private static string GetDataContentType(JsonElement json, out bool isJson)
{
var dataContentTypePropName = json
.EnumerateObject()
.Select(d => d.Name)
.FirstOrDefault(d =>
d.Equals(CloudEventPropertyNames.DataContentType,
StringComparison.OrdinalIgnoreCase));

string contentType;
if (json.TryGetProperty("datacontenttype", out var dataContentType) &&
dataContentType.ValueKind == JsonValueKind.String &&
MediaTypeHeaderValue.TryParse(dataContentType.GetString(), out var parsed))

if (dataContentTypePropName != null
&& json.TryGetProperty(dataContentTypePropName, out var dataContentType)
&& dataContentType.ValueKind == JsonValueKind.String
&& MediaTypeHeaderValue.TryParse(dataContentType.GetString(), out var parsed))
{
contentType = dataContentType.GetString();
isJson =
parsed.MediaType.Equals( "application/json", StringComparison.Ordinal) ||
isJson =
parsed.MediaType.Equals("application/json", StringComparison.Ordinal) ||
parsed.Suffix.EndsWith("+json", StringComparison.Ordinal);

// Since S.T.Json always outputs utf-8, we may need to normalize the data content type
Expand All @@ -179,7 +261,7 @@ private string GetDataContentType(JsonElement json, out bool isJson)
return contentType;
}

private bool MatchesContentType(HttpContext httpContext, out string charSet)
private static bool MatchesContentType(HttpContext httpContext, out string charSet)
{
if (httpContext.Request.ContentType == null)
{
Expand Down
Loading

0 comments on commit c814042

Please sign in to comment.