Skip to content

Commit

Permalink
Allow V3.0.1 to be compatible with V2.6 Return messages.
Browse files Browse the repository at this point in the history
Closes #236
  • Loading branch information
shlomii committed Mar 22, 2012
1 parent 812299e commit 97c562f
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 30 deletions.
10 changes: 10 additions & 0 deletions src/core/NServiceBus/Headers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,15 @@ public static class Headers
/// Header telling the NServiceBus Version (beginning NServiceBus V3.0.1).
/// </summary>
public const string NServiceBusVersion = "NServiceBus.Version";

/// <summary>
/// Used in a header when doing a callback (bus.return)
/// </summary>
public const string ReturnMessageErrorCodeHeader = "NServiceBus.ReturnMessage.ErrorCode";

/// <summary>
/// Header that tells if this transport message is a control message
/// </summary>
public const string ControlMessageHeader = "NServiceBus.ControlMessage";
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System;
// Completion message is used by a V3.X subsciber with a 2.6 publisher.
// Completion message is used by a V3.X subscriber with a 2.6 publisher.
// Do no change the namespace namespace

namespace NServiceBus.Unicast.Transport
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System.Globalization;
using Common.Logging;
using NServiceBus.Config;
using NServiceBus.MessageMutator;
using NServiceBus.Unicast.Transport;

namespace NServiceBus.Unicast.BackwardCompatibility
{
/// <summary>
/// If this is a V26 message, extract completion message return error code and place it in the transport headers
/// </summary>
public class IncomingReturnMessageMutator : IMutateIncomingMessages, INeedInitialization
{
/// <summary>
/// Reference to the BUS to get a hold of the current TransportMessage
/// </summary>
public IBus Bus { get; set; }

/// <summary>
/// If this is a completion message from a 2.6 sender, copy the error code.
/// </summary>
/// <param name="message">Message to copy ErrorCode from.</param>
/// <returns>Same message as received.</returns>
public object MutateIncoming(object message)
{
var completionMessage = message as CompletionMessage;
if (completionMessage == null)
return message;

if(!Bus.CurrentMessageContext.Headers.ContainsKey(Headers.ReturnMessageErrorCodeHeader))
Bus.CurrentMessageContext.Headers.Add(Headers.ReturnMessageErrorCodeHeader,
completionMessage.ErrorCode.ToString(CultureInfo.InvariantCulture));

//Change to Transport to be a Control Message so no need to find a handler for that.
if(!Bus.CurrentMessageContext.Headers.ContainsKey(Headers.ControlMessageHeader))
Bus.CurrentMessageContext.Headers.Add(Headers.ControlMessageHeader, true.ToString(CultureInfo.InvariantCulture));

return message;
}

/// <summary>
/// Register the IncomingReturnMessageMutator
/// </summary>
public void Init()
{
Configure.Instance.Configurer.ConfigureComponent<IncomingReturnMessageMutator>(DependencyLifecycle.InstancePerCall);
Log.Debug("Configured IncomingReturnMessageMutator");
}

private readonly static ILog Log = LogManager.GetLogger(typeof(IncomingSubscriptionMessageMutator));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

namespace NServiceBus.Unicast.BackwardCompatibility
{
class MutateTransportIncomingSubscriptionMessages : IMutateIncomingTransportMessages, INeedInitialization
class IncomingSubscriptionMessageMutator : IMutateIncomingTransportMessages, INeedInitialization
{
/// <summary>
/// Re-Adjust V3.0.0 subscribe & UnSubscribe messages. Version 3.0.0 Subs/Unsubs/Publish no NServiceBus.Version set it the headers.
/// Re-Adjust V3.0.0 subscribe and unsubscribe messages.
/// Version 3.0.0 subscribe and unsubscribe message have no NServiceBus.Version set it the headers.
/// Version 3.0.0 Send message have it with "3.0.0" set as value.
/// Do nothing If it is a V2.6 message (contains EnclosedMessageTypes key).
/// </summary>
/// <param name="transportMessage"></param>
Expand All @@ -24,14 +26,14 @@ public void MutateIncoming(TransportMessage transportMessage)
}

/// <summary>
/// Register the MutateTransportIncomingSubscriptionMessages mutator
/// Register the IncomingSubscriptionMessageMutator mutator
/// </summary>
public void Init()
{
Configure.Instance.Configurer.ConfigureComponent<MutateTransportIncomingSubscriptionMessages>(DependencyLifecycle.InstancePerCall);
Log.Debug("Configured Transport Incoming Message Mutator: MutateTransportIncomingSubscriptionMessages");
Configure.Instance.Configurer.ConfigureComponent<IncomingSubscriptionMessageMutator>(DependencyLifecycle.InstancePerCall);
Log.Debug("Configured Transport Incoming Message Mutator: IncomingSubscriptionMessageMutator");
}

private readonly static ILog Log = LogManager.GetLogger(typeof(MutateTransportIncomingSubscriptionMessages));
private readonly static ILog Log = LogManager.GetLogger(typeof(IncomingSubscriptionMessageMutator));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@
</ItemGroup>
<ItemGroup>
<Compile Include="CompletionMessage.cs" />
<Compile Include="MutateTransportIncomingSubscriptionMessages.cs" />
<Compile Include="MutateTransportOutgoingSubscriptionMessages.cs" />
<Compile Include="IncomingReturnMessageMutator.cs" />
<Compile Include="IncomingSubscriptionMessageMutator.cs" />
<Compile Include="OutgoingSubscriptionMessageMutator.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.IO;
using System.Threading;
using Common.Logging;
using NServiceBus.Config;
using NServiceBus.MessageMutator;
Expand All @@ -11,7 +10,7 @@ namespace NServiceBus.Unicast.BackwardCompatibility
/// <summary>
/// Allow for a V3.X subscriber to subscribe/unsubscribe to a V2.6 publisher
/// </summary>
public class MutateTransportOutgoingSubscriptionMessages : IMutateOutgoingTransportMessages, INeedInitialization
public class OutgoingSubscriptionMessageMutator : IMutateOutgoingTransportMessages, INeedInitialization
{
/// <summary>
/// Allow for a V3.X subscriber to subscribe/unsubscribe to a V2.6 publisher
Expand All @@ -23,26 +22,32 @@ public class MutateTransportOutgoingSubscriptionMessages : IMutateOutgoingTransp
public void MutateOutgoing(object[] messages, TransportMessage transportMessage)
{
if ((transportMessage.IsControlMessage() &&
((transportMessage.MessageIntent == MessageIntentEnum.Subscribe) || (transportMessage.MessageIntent == MessageIntentEnum.Unsubscribe))))
((transportMessage.MessageIntent == MessageIntentEnum.Subscribe) ||
(transportMessage.MessageIntent == MessageIntentEnum.Unsubscribe) ||
(transportMessage.MessageIntent == MessageIntentEnum.Send))))
{
var stream = new MemoryStream();
MessageSerializer.Serialize(new object[] { new CompletionMessage() }, stream);
var completionMessage = new CompletionMessage();
if (transportMessage.Headers.ContainsKey(Headers.ReturnMessageErrorCodeHeader))
completionMessage.ErrorCode = int.Parse(transportMessage.Headers[Headers.ReturnMessageErrorCodeHeader]);

MessageSerializer.Serialize(new object[] { completionMessage }, stream);
transportMessage.Body = stream.ToArray();
Log.Debug("Added Completion message and sending message intent: " + transportMessage.MessageIntent);
}
}

/// <summary>
/// Register the MutateTransportOutgoingSubscriptionMessages mutator
/// Register the OutgoingSubscriptionMessageMutator mutator
/// </summary>
public void Init()
{
Configure.Instance.Configurer.ConfigureComponent<MutateTransportOutgoingSubscriptionMessages>(DependencyLifecycle.InstancePerCall);
Configure.Instance.Configurer.ConfigureComponent<OutgoingSubscriptionMessageMutator>(DependencyLifecycle.InstancePerCall);
}
/// <summary>
/// Gets or sets the message serializer
/// </summary>
public IMessageSerializer MessageSerializer { get; set; }
private readonly static ILog Log = LogManager.GetLogger(typeof(MutateTransportOutgoingSubscriptionMessages));
private readonly static ILog Log = LogManager.GetLogger(typeof(OutgoingSubscriptionMessageMutator));
}
}
10 changes: 5 additions & 5 deletions src/unicast/NServiceBus.Unicast/UnicastBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public class UnicastBus : IUnicastBus, IStartableBus
/// </summary>
public const string SubscriptionMessageType = "SubscriptionMessageType";

private const string ReturnMessageErrorCodeHeader = "NServiceBus.ReturnMessage.ErrorCode";

#region config properties

private bool autoSubscribe = true;
Expand Down Expand Up @@ -480,10 +478,12 @@ void IBus.Return<T>(T errorCode)
{
var returnMessage = ControlMessage.Create();

returnMessage.Headers[ReturnMessageErrorCodeHeader] = errorCode.GetHashCode().ToString();
returnMessage.Headers[Headers.ReturnMessageErrorCodeHeader] = errorCode.GetHashCode().ToString();
returnMessage.CorrelationId = _messageBeingHandled.IdForCorrelation;
returnMessage.MessageIntent = MessageIntentEnum.Send;

InvokeOutgoingTransportMessagesMutators(new object[] { }, returnMessage);

MessageSender.Send(returnMessage, _messageBeingHandled.ReplyToAddress);
}

Expand Down Expand Up @@ -1111,8 +1111,8 @@ void HandleCorellatedMessage(TransportMessage msg, object[] messages)

var statusCode = int.MinValue;

if (msg.IsControlMessage() && msg.Headers.ContainsKey(ReturnMessageErrorCodeHeader))
statusCode = int.Parse(msg.Headers[ReturnMessageErrorCodeHeader]);
if (msg.IsControlMessage() && msg.Headers.ContainsKey(Headers.ReturnMessageErrorCodeHeader))
statusCode = int.Parse(msg.Headers[Headers.ReturnMessageErrorCodeHeader]);

busAsyncResult.Complete(statusCode, messages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System.Collections.Generic;

/// <summary>
/// Helper for creating controll messages
/// Helper for creating control messages
/// </summary>
public static class ControlMessage
{
Expand All @@ -20,15 +20,10 @@ public static TransportMessage Create()
Recoverable = true,
MessageIntent = MessageIntentEnum.Send
};
transportMessage.Headers.Add(ControlMessageHeader, true.ToString());
transportMessage.Headers.Add(Headers.ControlMessageHeader, true.ToString());

return transportMessage;
}

/// <summary>
/// Header which tells that this transportmessage is a controll message
/// </summary>
public static string ControlMessageHeader = "NServiceBus.ControlMessage";
}

/// <summary>
Expand All @@ -37,14 +32,14 @@ public static TransportMessage Create()
public static class TransportMessageExtensions
{
/// <summary>
/// True if the transportmessage is a control message
/// True if the transport message is a control message
/// </summary>
/// <param name="transportMessage"></param>
/// <returns></returns>
public static bool IsControlMessage(this TransportMessage transportMessage)
{
return transportMessage.Headers != null &&
transportMessage.Headers.ContainsKey(ControlMessage.ControlMessageHeader);
transportMessage.Headers.ContainsKey(Headers.ControlMessageHeader);
}

}
Expand Down

0 comments on commit 97c562f

Please sign in to comment.