Skip to content

Commit

Permalink
Big fix for erroneously double playing messages when running with Sol…
Browse files Browse the repository at this point in the history
…o durability
  • Loading branch information
jeremydmiller committed Sep 1, 2024
1 parent 5cb7f00 commit d1c99b4
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\Transports\RabbitMQ\Wolverine.RabbitMQ\Wolverine.RabbitMQ.csproj" />
<ProjectReference Include="..\Wolverine.Marten\Wolverine.Marten.csproj" />
</ItemGroup>

<ItemGroup>
<Compile Include="..\..\Servers.cs">
<Link>Servers.cs</Link>
</Compile>
</ItemGroup>

</Project>
114 changes: 114 additions & 0 deletions src/Persistence/DuplicateMessageSending/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using System.Collections.Concurrent;
using IntegrationTests;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Oakton;
using Oakton.Resources;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Postgresql;
using Wolverine.RabbitMQ;

// Postgres if true, Rabbit if false
var usePostgres = false;

await Host.CreateDefaultBuilder(args)
.UseWolverine(opts =>
{
if (usePostgres)
{
opts.PublishMessage<Ping>().ToPostgresqlQueue("ping_queue");
opts.PublishMessage<Pong>().ToPostgresqlQueue("pong_queue");
opts.ListenToPostgresqlQueue("ping_queue");
opts.ListenToPostgresqlQueue("pong_queue");
}
else
{
opts.UseRabbitMq(new Uri("amqp://localhost"));
opts.PublishMessage<Ping>().ToRabbitQueue("ping_queue");
opts.PublishMessage<Pong>().ToRabbitQueue("pong_queue");
opts.ListenToRabbitQueue("ping_queue")
.PreFetchCount(10)
.ListenerCount(5)
.UseDurableInbox(); // Remove durable inbox and the problem goes away
opts.ListenToRabbitQueue("pong_queue")
.PreFetchCount(10)
.ListenerCount(5)
.ProcessInline();
}
opts.Services.AddResourceSetupOnStartup();
opts.Durability.Mode = DurabilityMode.Solo;
})
.ConfigureServices(services =>
{
services.AddHostedService<Sender>();
services.AddMarten(opts =>
{
opts.Connection(Servers.PostgresConnectionString);
opts.DisableNpgsqlLogging = true;
})
.IntegrateWithWolverine("public", transportSchemaName: "public")
.ApplyAllDatabaseChangesOnStartup();
})
.UseResourceSetupOnStartup()
.RunOaktonCommands(args);


public class Sender(ILogger<Sender> logger, IMessageBus bus) : BackgroundService
{
static ConcurrentDictionary<string, bool> _sentMessages = new();

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var uid = Random.Shared.Next(0, 1_000_000); // to avoid any previous message from an earlier run to interfere with this run
for (var i = 0; i < 200; i++)
{
var pingNumber = $"{uid} - {i}";

if (!_sentMessages.TryAdd(pingNumber, true))
{
logger.LogError("DUPLICATE PING GOING OUT WITH NUMBER " + pingNumber);
}

logger.LogInformation("Sending Ping #{Number}", pingNumber);
await bus.PublishAsync(new Ping(pingNumber));
}
}
}


public class MessageHandler
{
private static ConcurrentDictionary<string, int> _receivedMessages = new();


public Pong Handle(Ping ping, ILogger<MessageHandler> logger, Envelope envelope)
{
//_tracker.Track(envelope);

logger.LogInformation("Received Ping #{Number}", ping.Id);
Thread.Sleep(1000); // await Task.Delay(1000); produces same result. I use Thread.Sleep to exclude potential async/await issues
return new Pong(ping.Id);
}

public static void Handle(Pong pong, ILogger<MessageHandler> logger, Envelope envelope)
{
if (_receivedMessages.TryAdd(pong.Id, 1))
{
logger.LogInformation("Received Pong #{Number}", pong.Id);
}
else
{
logger.LogError("Received Pong #{Number} as Duplicate with Id {Id} and attempt {Attempt}", pong.Id, envelope.Id, envelope.Attempts);
}
}
}

public record Ping(string Id);

public record Pong(string Id);
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ internal static Uri ToUri(string name, string? databaseName)
private bool _hasInitialized;
private IPostgresqlQueueSender? _sender;
private ImHashMap<string, bool> _checkedDatabases = ImHashMap<string, bool>.Empty;
private readonly Lazy<QueueTable> _queueTable;
private readonly Lazy<ScheduledMessageTable> _scheduledMessageTable;

public PostgresqlQueue(string name, PostgresqlTransport parent, EndpointRole role = EndpointRole.Application,
string? databaseName = null) :
Expand All @@ -35,17 +37,19 @@ public PostgresqlQueue(string name, PostgresqlTransport parent, EndpointRole rol
Name = name;
EndpointName = name;

QueueTable = new QueueTable(Parent, queueTableName);
ScheduledTable = new ScheduledMessageTable(Parent, scheduledTableName);
// Gotta be lazy so the schema names get set
_queueTable = new Lazy<QueueTable>(() => new QueueTable(Parent, queueTableName));
_scheduledMessageTable =
new Lazy<ScheduledMessageTable>(() => new ScheduledMessageTable(Parent, scheduledTableName));
}

public string Name { get; }

internal PostgresqlTransport Parent { get; }

internal Table QueueTable { get; private set; }
internal Table QueueTable => _queueTable.Value;

internal Table ScheduledTable { get; private set; }
internal Table ScheduledTable => _scheduledMessageTable.Value;

protected override bool supportsMode(EndpointMode mode)
{
Expand Down
11 changes: 7 additions & 4 deletions src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,13 @@ public Task StartAsync(CancellationToken cancellationToken)
_runningBlock.Post(batch);
}, _settings, recoveryStart, _settings.ScheduledJobPollingTime);

_reassignmentTimer =
new Timer(
_ => { _runningBlock.Post(new ReassignDormantNodes(_runtime.Storage.Nodes, _database)); },
_settings, _settings.FirstNodeReassignmentExecution, _settings.NodeReassignmentPollingTime);
if (_settings.DurabilityAgentEnabled && _settings.Mode == DurabilityMode.Balanced)
{
_reassignmentTimer =
new Timer(
_ => { _runningBlock.Post(new ReassignDormantNodes(_runtime.Storage.Nodes, _database)); },
_settings, _settings.FirstNodeReassignmentExecution, _settings.NodeReassignmentPollingTime);
}

if (AutoStartScheduledJobPolling)
{
Expand Down
4 changes: 3 additions & 1 deletion src/Persistence/Wolverine.RDBMS/MessageDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,13 @@ public async Task ReleaseIncomingAsync(int ownerId)
{
if (_cancellation.IsCancellationRequested) return;

await _dataSource
var count = await _dataSource
.CreateCommand(
$"update {SchemaName}.{DatabaseConstants.IncomingTable} set owner_id = 0 where owner_id = @owner")
.With("owner", ownerId)
.ExecuteNonQueryAsync(_cancellation);

Logger.LogInformation("Reassigned {Count} incoming messages from {Owner} to any node in the durable inbox", count, ownerId);
}

public async Task ReleaseIncomingAsync(int ownerId, Uri receivedAt)
Expand Down
7 changes: 7 additions & 0 deletions wolverine.sln
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MartenTests", "src\Persiste
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PostgresqlTests", "src\Persistence\PostgresqlTests\PostgresqlTests.csproj", "{4257ECCC-A172-4FC0-BE0D-A60FDE0C8A74}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DuplicateMessageSending", "src\Persistence\DuplicateMessageSending\DuplicateMessageSending.csproj", "{1A34A78B-F6AB-41A9-8B90-384C6E1DBC63}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -592,6 +594,10 @@ Global
{4257ECCC-A172-4FC0-BE0D-A60FDE0C8A74}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4257ECCC-A172-4FC0-BE0D-A60FDE0C8A74}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4257ECCC-A172-4FC0-BE0D-A60FDE0C8A74}.Release|Any CPU.Build.0 = Release|Any CPU
{1A34A78B-F6AB-41A9-8B90-384C6E1DBC63}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1A34A78B-F6AB-41A9-8B90-384C6E1DBC63}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1A34A78B-F6AB-41A9-8B90-384C6E1DBC63}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1A34A78B-F6AB-41A9-8B90-384C6E1DBC63}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{24497E6A-D6B1-4C80-ABFB-57FFAD5070C4} = {96119B5E-B5F0-400A-9580-B342EBE26212}
Expand Down Expand Up @@ -696,5 +702,6 @@ Global
{F3ECE3DC-153D-4F8A-AC48-2396F2B46ED7} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210}
{D01C99ED-B735-4854-A07D-B55E506A1441} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210}
{4257ECCC-A172-4FC0-BE0D-A60FDE0C8A74} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210}
{1A34A78B-F6AB-41A9-8B90-384C6E1DBC63} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210}
EndGlobalSection
EndGlobal

0 comments on commit d1c99b4

Please sign in to comment.