Skip to content

Commit

Permalink
forward cloudevent props
Browse files Browse the repository at this point in the history
  • Loading branch information
IliasP91 committed Sep 25, 2023
1 parent 99d874a commit 1c3a2a4
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// limitations under the License.
// ------------------------------------------------------------------------

using System.Linq;

namespace ControllerSample.Controllers
{
using System;
Expand Down Expand Up @@ -43,6 +45,7 @@ public SampleController(ILogger<SampleController> logger)
/// State store name.
/// </summary>
public const string StoreName = "statestore";

private readonly ILogger<SampleController> logger;

/// <summary>
Expand Down Expand Up @@ -72,6 +75,11 @@ public ActionResult<Account> Get([FromState(StoreName)] StateEntry<Account> acco
[HttpPost("deposit")]
public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromServices] DaprClient daprClient)
{
// Example reading cloudevent properties from the headers
var headerEntries = Request.Headers.Aggregate("", (current, header) => current + ($"------- Header: {header.Key} : {header.Value}" + Environment.NewLine));

logger.LogInformation(headerEntries);

logger.LogInformation("Enter deposit");
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
Expand All @@ -83,7 +91,7 @@ public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromS
}

state.Value.Balance += transaction.Amount;
logger.LogInformation("Balance for Id {0} is {1}",state.Value.Id, state.Value.Balance);
logger.LogInformation("Balance for Id {0} is {1}", state.Value.Id, state.Value.Balance);
await state.SaveAsync();
return state.Value;
}
Expand All @@ -98,22 +106,23 @@ public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromS
[Topic("pubsub", "multideposit", "amountDeadLetterTopic", false)]
[BulkSubscribe("multideposit", 500, 2000)]
[HttpPost("multideposit")]
public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody] BulkSubscribeMessage<BulkMessageModel<Transaction>>
bulkMessage, [FromServices] DaprClient daprClient)
public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody]
BulkSubscribeMessage<BulkMessageModel<Transaction>>
bulkMessage, [FromServices] DaprClient daprClient)
{
logger.LogInformation("Enter bulk deposit");

List<BulkSubscribeAppResponseEntry> entries = new List<BulkSubscribeAppResponseEntry>();

foreach (var entry in bulkMessage.Entries)
{
{
try
{
var transaction = entry.Event.Data;

var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
logger.LogInformation("Id is {0}, the amount to be deposited is {1}",
logger.LogInformation("Id is {0}, the amount to be deposited is {1}",
transaction.Id, transaction.Amount);

if (transaction.Amount < 0m)
Expand All @@ -124,12 +133,16 @@ public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody
state.Value.Balance += transaction.Amount;
logger.LogInformation("Balance is {0}", state.Value.Balance);
await state.SaveAsync();
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
entries.Add(
new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
}
catch (Exception e)
{
logger.LogError(e.Message);
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.RETRY));
}
}

return new BulkSubscribeAppResponse(entries);
}

Expand Down Expand Up @@ -165,6 +178,7 @@ public async Task<ActionResult<Account>> Withdraw(Transaction transaction, [From
{
return this.NotFound();
}

if (transaction.Amount < 0m)
{
return BadRequest(new { statusCode = 400, message = "bad request" });
Expand All @@ -185,7 +199,8 @@ public async Task<ActionResult<Account>> Withdraw(Transaction transaction, [From
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
[Topic("pubsub", "withdraw", "event.type ==\"withdraw.v2\"", 1)]
[HttpPost("withdraw.v2")]
public async Task<ActionResult<Account>> WithdrawV2(TransactionV2 transaction, [FromServices] DaprClient daprClient)
public async Task<ActionResult<Account>> WithdrawV2(TransactionV2 transaction,
[FromServices] DaprClient daprClient)
{
logger.LogInformation("Enter withdraw.v2");
if (transaction.Channel == "mobile" && transaction.Amount > 10000)
Expand Down Expand Up @@ -214,12 +229,15 @@ public async Task<ActionResult<Account>> WithdrawV2(TransactionV2 transaction, [
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
[Topic("pubsub", "rawDeposit", true)]
[HttpPost("rawDeposit")]
public async Task<ActionResult<Account>> RawDeposit([FromBody] JsonDocument rawTransaction, [FromServices] DaprClient daprClient)
public async Task<ActionResult<Account>> RawDeposit([FromBody] JsonDocument rawTransaction,
[FromServices] DaprClient daprClient)
{
var transactionString = rawTransaction.RootElement.GetProperty("data_base64").GetString();
logger.LogInformation($"Enter deposit: {transactionString} - {Encoding.UTF8.GetString(Convert.FromBase64String(transactionString))}");
logger.LogInformation(
$"Enter deposit: {transactionString} - {Encoding.UTF8.GetString(Convert.FromBase64String(transactionString))}");
var transactionJson = JsonSerializer.Deserialize<JsonDocument>(Convert.FromBase64String(transactionString));
var transaction = JsonSerializer.Deserialize<Transaction>(transactionJson.RootElement.GetProperty("data").GetRawText());
var transaction =
JsonSerializer.Deserialize<Transaction>(transactionJson.RootElement.GetProperty("data").GetRawText());
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
logger.LogInformation("Id is {0}, the amount to be deposited is {1}", transaction.Id, transaction.Amount);
Expand All @@ -239,7 +257,8 @@ public async Task<ActionResult<Account>> RawDeposit([FromBody] JsonDocument rawT
/// Method for returning a BadRequest result which will cause Dapr sidecar to throw an RpcException
/// </summary>
[HttpPost("throwException")]
public async Task<ActionResult<Account>> ThrowException(Transaction transaction, [FromServices] DaprClient daprClient)
public async Task<ActionResult<Account>> ThrowException(Transaction transaction,
[FromServices] DaprClient daprClient)
{
logger.LogInformation("Enter ThrowException");
var task = Task.Delay(10);
Expand Down
7 changes: 6 additions & 1 deletion examples/AspNetCore/ControllerSample/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr;

namespace ControllerSample
{
using Microsoft.AspNetCore.Builder;
Expand Down Expand Up @@ -61,7 +63,10 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env)

app.UseRouting();

app.UseCloudEvents();
app.UseCloudEvents(new CloudEventsMiddlewareOptions
{
ForwardCloudEventPropertiesAsHeaders = true
});

app.UseAuthorization();

Expand Down
79 changes: 64 additions & 15 deletions src/Dapr.AspNetCore/CloudEventsMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
// limitations under the License.
// ------------------------------------------------------------------------

using System.Collections.Generic;
using System.Linq;

namespace Dapr
{
using System;
Expand All @@ -27,6 +30,14 @@ namespace Dapr
internal class CloudEventsMiddleware
{
private const string ContentType = "application/cloudevents+json";

// These cloudevent properties are either containing the body of the message or
// are included in the headers by other components of Dapr earlier in the pipeline
private static readonly string[] ExcludedPropertiesFromHeaders =
{
"datacontenttype", "data", "data_base64", "pubsubname", "traceparent"
};

private readonly RequestDelegate next;
private readonly CloudEventsMiddlewareOptions options;

Expand All @@ -52,7 +63,7 @@ public Task InvokeAsync(HttpContext httpContext)
// The philosophy here is that we don't report an error for things we don't support, because
// that would block someone from implementing their own support for it. We only report an error
// when something we do support isn't correct.
if (!this.MatchesContentType(httpContext, out var charSet))
if (!MatchesContentType(httpContext, out var charSet))
{
return this.next(httpContext);
}
Expand All @@ -69,7 +80,8 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
}
else
{
using (var reader = new HttpRequestStreamReader(httpContext.Request.Body, Encoding.GetEncoding(charSet)))
using (var reader =
new HttpRequestStreamReader(httpContext.Request.Body, Encoding.GetEncoding(charSet)))
{
var text = await reader.ReadToEndAsync();
json = JsonSerializer.Deserialize<JsonElement>(text);
Expand All @@ -83,17 +95,29 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
string contentType;

// Check whether to use data or data_base64 as per https://github.com/cloudevents/spec/blob/v1.0.1/json-format.md#31-handling-of-data
var isDataSet = json.TryGetProperty("data", out var data);
var isBinaryDataSet = json.TryGetProperty("data_base64", out var binaryData);
// Get the property names by OrdinalIgnoreCase comparison to support case insensitive JSON as the Json Serializer for AspCore already supports it by default.
var jsonPropNames = json.EnumerateObject().ToArray();

var dataPropName = jsonPropNames
.Select(d => d.Name)
.FirstOrDefault(d => d.Equals("data", StringComparison.OrdinalIgnoreCase)) ?? "";

var dataBase64PropName = jsonPropNames
.Select(d => d.Name)
.FirstOrDefault(d => d.Equals("data_base64", StringComparison.OrdinalIgnoreCase)) ?? "";

var isDataSet = json.TryGetProperty(dataPropName, out var data);
var isBinaryDataSet = json.TryGetProperty(dataBase64PropName, out var binaryData);

if (isDataSet && isBinaryDataSet)
{
httpContext.Response.StatusCode = (int)HttpStatusCode.BadRequest;
return;
}
else if (isDataSet)

if (isDataSet)
{
contentType = this.GetDataContentType(json, out var isJson);
contentType = GetDataContentType(json, out var isJson);

// If the value is anything other than a JSON string, treat it as JSON. Cloud Events requires
// non-JSON text to be enclosed in a JSON string.
Expand All @@ -109,8 +133,8 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
{
// Rehydrate body from contents of the string
var text = data.GetString();
using var writer = new HttpResponseStreamWriter(body, Encoding.UTF8);
writer.Write(text);
await using var writer = new HttpResponseStreamWriter(body, Encoding.UTF8);
await writer.WriteAsync(text);
}

body.Seek(0L, SeekOrigin.Begin);
Expand All @@ -123,14 +147,16 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
var decodedBody = binaryData.GetBytesFromBase64();
body = new MemoryStream(decodedBody);
body.Seek(0L, SeekOrigin.Begin);
contentType = this.GetDataContentType(json, out _);
contentType = GetDataContentType(json, out _);
}
else
{
body = new MemoryStream();
contentType = null;
}

ForwardCloudEventPropertiesAsHeaders(httpContext, jsonPropNames);

originalBody = httpContext.Request.Body;
originalContentType = httpContext.Request.ContentType;

Expand All @@ -148,16 +174,39 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
}
}

private string GetDataContentType(JsonElement json, out bool isJson)
private void ForwardCloudEventPropertiesAsHeaders(
HttpContext httpContext,
IEnumerable<JsonProperty> jsonPropNames)
{
if (!options.ForwardCloudEventPropertiesAsHeaders)
{
return;
}

foreach (var jsonProperty in jsonPropNames
.Where(d => !ExcludedPropertiesFromHeaders.Contains(d.Name.ToLowerInvariant())))
{
httpContext.Request.Headers.TryAdd($"Cloudevent.{jsonProperty.Name.ToLowerInvariant()}",
jsonProperty.Value.GetRawText().Trim('\"'));
}
}

private static string GetDataContentType(JsonElement json, out bool isJson)
{
var dataContentTypePropName = json.EnumerateObject()
.Select(d => d.Name)
.FirstOrDefault(d =>
d.Equals("datacontenttype", StringComparison.OrdinalIgnoreCase))
?? "";
string contentType;
if (json.TryGetProperty("datacontenttype", out var dataContentType) &&
dataContentType.ValueKind == JsonValueKind.String &&

if (json.TryGetProperty(dataContentTypePropName, out var dataContentType) &&
dataContentType.ValueKind == JsonValueKind.String &&
MediaTypeHeaderValue.TryParse(dataContentType.GetString(), out var parsed))
{
contentType = dataContentType.GetString();
isJson =
parsed.MediaType.Equals( "application/json", StringComparison.Ordinal) ||
isJson =
parsed.MediaType.Equals("application/json", StringComparison.Ordinal) ||
parsed.Suffix.EndsWith("+json", StringComparison.Ordinal);

// Since S.T.Json always outputs utf-8, we may need to normalize the data content type
Expand All @@ -179,7 +228,7 @@ private string GetDataContentType(JsonElement json, out bool isJson)
return contentType;
}

private bool MatchesContentType(HttpContext httpContext, out string charSet)
private static bool MatchesContentType(HttpContext httpContext, out string charSet)
{
if (httpContext.Request.ContentType == null)
{
Expand Down
13 changes: 12 additions & 1 deletion src/Dapr.AspNetCore/CloudEventsMiddlewareOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,20 @@ public class CloudEventsMiddlewareOptions
/// instead of the expected JSON-decoded value of <c>Hello, "world!"</c>.
/// </para>
/// <para>
/// Setting this property to <c>true</c> restores the previous invalid behavior for compatiblity.
/// Setting this property to <c>true</c> restores the previous invalid behavior for compatibility.
/// </para>
/// </remarks>
public bool SuppressJsonDecodingOfTextPayloads { get; set; }

/// <summary>
/// Gets or sets a value that will determine whether the CloudEvent properties will be forwarded as Request Headers.
/// </summary>
/// <remarks>
/// <para>
/// Setting this property to <c>true</c> will forward the CloudEvent properties as Request Headers in the following format.
/// ie. A CloudEvent property <c>"type": "Example.Type"</c> will be added as <c>"Cloudevent.type": "Example.Type"</c> request header.
/// </para>
/// </remarks>
public bool ForwardCloudEventPropertiesAsHeaders { get; set; }
}
}
Loading

0 comments on commit 1c3a2a4

Please sign in to comment.