From d1c99b4abccb684d42dd15b89bc76d54dce86db0 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Sun, 1 Sep 2024 14:49:57 -0500 Subject: [PATCH] Big fix for erroneously double playing messages when running with Solo durability --- .../DuplicateMessageSending.csproj | 21 ++++ .../DuplicateMessageSending/Program.cs | 114 ++++++++++++++++++ .../Transport/PostgresqlQueue.cs | 12 +- .../Wolverine.RDBMS/DurabilityAgent.cs | 11 +- .../Wolverine.RDBMS/MessageDatabase.cs | 4 +- wolverine.sln | 7 ++ 6 files changed, 160 insertions(+), 9 deletions(-) create mode 100644 src/Persistence/DuplicateMessageSending/DuplicateMessageSending.csproj create mode 100644 src/Persistence/DuplicateMessageSending/Program.cs diff --git a/src/Persistence/DuplicateMessageSending/DuplicateMessageSending.csproj b/src/Persistence/DuplicateMessageSending/DuplicateMessageSending.csproj new file mode 100644 index 00000000..3b82fe12 --- /dev/null +++ b/src/Persistence/DuplicateMessageSending/DuplicateMessageSending.csproj @@ -0,0 +1,21 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + + Servers.cs + + + + diff --git a/src/Persistence/DuplicateMessageSending/Program.cs b/src/Persistence/DuplicateMessageSending/Program.cs new file mode 100644 index 00000000..889ffb5a --- /dev/null +++ b/src/Persistence/DuplicateMessageSending/Program.cs @@ -0,0 +1,114 @@ +using System.Collections.Concurrent; +using IntegrationTests; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Oakton; +using Oakton.Resources; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Postgresql; +using Wolverine.RabbitMQ; + +// Postgres if true, Rabbit if false +var usePostgres = false; + +await Host.CreateDefaultBuilder(args) + .UseWolverine(opts => + { + if (usePostgres) + { + opts.PublishMessage().ToPostgresqlQueue("ping_queue"); + opts.PublishMessage().ToPostgresqlQueue("pong_queue"); + opts.ListenToPostgresqlQueue("ping_queue"); + opts.ListenToPostgresqlQueue("pong_queue"); + } + else + { + opts.UseRabbitMq(new Uri("amqp://localhost")); + opts.PublishMessage().ToRabbitQueue("ping_queue"); + opts.PublishMessage().ToRabbitQueue("pong_queue"); + opts.ListenToRabbitQueue("ping_queue") + .PreFetchCount(10) + .ListenerCount(5) + .UseDurableInbox(); // Remove durable inbox and the problem goes away + opts.ListenToRabbitQueue("pong_queue") + .PreFetchCount(10) + .ListenerCount(5) + .ProcessInline(); + } + + opts.Services.AddResourceSetupOnStartup(); + + opts.Durability.Mode = DurabilityMode.Solo; + }) + .ConfigureServices(services => + { + services.AddHostedService(); + + services.AddMarten(opts => + { + opts.Connection(Servers.PostgresConnectionString); + opts.DisableNpgsqlLogging = true; + }) + .IntegrateWithWolverine("public", transportSchemaName: "public") + .ApplyAllDatabaseChangesOnStartup(); + }) + .UseResourceSetupOnStartup() + .RunOaktonCommands(args); + + +public class Sender(ILogger logger, IMessageBus bus) : BackgroundService +{ + static ConcurrentDictionary _sentMessages = new(); + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var uid = Random.Shared.Next(0, 1_000_000); // to avoid any previous message from an earlier run to interfere with this run + for (var i = 0; i < 200; i++) + { + var pingNumber = $"{uid} - {i}"; + + if (!_sentMessages.TryAdd(pingNumber, true)) + { + logger.LogError("DUPLICATE PING GOING OUT WITH NUMBER " + pingNumber); + } + + logger.LogInformation("Sending Ping #{Number}", pingNumber); + await bus.PublishAsync(new Ping(pingNumber)); + } + } +} + + +public class MessageHandler +{ + private static ConcurrentDictionary _receivedMessages = new(); + + + public Pong Handle(Ping ping, ILogger logger, Envelope envelope) + { + //_tracker.Track(envelope); + + logger.LogInformation("Received Ping #{Number}", ping.Id); + Thread.Sleep(1000); // await Task.Delay(1000); produces same result. I use Thread.Sleep to exclude potential async/await issues + return new Pong(ping.Id); + } + + public static void Handle(Pong pong, ILogger logger, Envelope envelope) + { + if (_receivedMessages.TryAdd(pong.Id, 1)) + { + logger.LogInformation("Received Pong #{Number}", pong.Id); + } + else + { + logger.LogError("Received Pong #{Number} as Duplicate with Id {Id} and attempt {Attempt}", pong.Id, envelope.Id, envelope.Attempts); + } + } +} + +public record Ping(string Id); + +public record Pong(string Id); diff --git a/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueue.cs b/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueue.cs index 268122a2..3c6d6816 100644 --- a/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueue.cs +++ b/src/Persistence/Wolverine.Postgresql/Transport/PostgresqlQueue.cs @@ -22,6 +22,8 @@ internal static Uri ToUri(string name, string? databaseName) private bool _hasInitialized; private IPostgresqlQueueSender? _sender; private ImHashMap _checkedDatabases = ImHashMap.Empty; + private readonly Lazy _queueTable; + private readonly Lazy _scheduledMessageTable; public PostgresqlQueue(string name, PostgresqlTransport parent, EndpointRole role = EndpointRole.Application, string? databaseName = null) : @@ -35,17 +37,19 @@ public PostgresqlQueue(string name, PostgresqlTransport parent, EndpointRole rol Name = name; EndpointName = name; - QueueTable = new QueueTable(Parent, queueTableName); - ScheduledTable = new ScheduledMessageTable(Parent, scheduledTableName); + // Gotta be lazy so the schema names get set + _queueTable = new Lazy(() => new QueueTable(Parent, queueTableName)); + _scheduledMessageTable = + new Lazy(() => new ScheduledMessageTable(Parent, scheduledTableName)); } public string Name { get; } internal PostgresqlTransport Parent { get; } - internal Table QueueTable { get; private set; } + internal Table QueueTable => _queueTable.Value; - internal Table ScheduledTable { get; private set; } + internal Table ScheduledTable => _scheduledMessageTable.Value; protected override bool supportsMode(EndpointMode mode) { diff --git a/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs b/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs index 2a74554d..79e2ec0e 100644 --- a/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs +++ b/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs @@ -95,10 +95,13 @@ public Task StartAsync(CancellationToken cancellationToken) _runningBlock.Post(batch); }, _settings, recoveryStart, _settings.ScheduledJobPollingTime); - _reassignmentTimer = - new Timer( - _ => { _runningBlock.Post(new ReassignDormantNodes(_runtime.Storage.Nodes, _database)); }, - _settings, _settings.FirstNodeReassignmentExecution, _settings.NodeReassignmentPollingTime); + if (_settings.DurabilityAgentEnabled && _settings.Mode == DurabilityMode.Balanced) + { + _reassignmentTimer = + new Timer( + _ => { _runningBlock.Post(new ReassignDormantNodes(_runtime.Storage.Nodes, _database)); }, + _settings, _settings.FirstNodeReassignmentExecution, _settings.NodeReassignmentPollingTime); + } if (AutoStartScheduledJobPolling) { diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs index f2e7cc0f..9316b8f1 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.cs @@ -183,11 +183,13 @@ public async Task ReleaseIncomingAsync(int ownerId) { if (_cancellation.IsCancellationRequested) return; - await _dataSource + var count = await _dataSource .CreateCommand( $"update {SchemaName}.{DatabaseConstants.IncomingTable} set owner_id = 0 where owner_id = @owner") .With("owner", ownerId) .ExecuteNonQueryAsync(_cancellation); + + Logger.LogInformation("Reassigned {Count} incoming messages from {Owner} to any node in the durable inbox", count, ownerId); } public async Task ReleaseIncomingAsync(int ownerId, Uri receivedAt) diff --git a/wolverine.sln b/wolverine.sln index 1ef6421b..06d03e48 100644 --- a/wolverine.sln +++ b/wolverine.sln @@ -235,6 +235,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MartenTests", "src\Persiste EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PostgresqlTests", "src\Persistence\PostgresqlTests\PostgresqlTests.csproj", "{4257ECCC-A172-4FC0-BE0D-A60FDE0C8A74}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DuplicateMessageSending", "src\Persistence\DuplicateMessageSending\DuplicateMessageSending.csproj", "{1A34A78B-F6AB-41A9-8B90-384C6E1DBC63}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -592,6 +594,10 @@ Global {4257ECCC-A172-4FC0-BE0D-A60FDE0C8A74}.Debug|Any CPU.Build.0 = Debug|Any CPU {4257ECCC-A172-4FC0-BE0D-A60FDE0C8A74}.Release|Any CPU.ActiveCfg = Release|Any CPU {4257ECCC-A172-4FC0-BE0D-A60FDE0C8A74}.Release|Any CPU.Build.0 = Release|Any CPU + {1A34A78B-F6AB-41A9-8B90-384C6E1DBC63}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1A34A78B-F6AB-41A9-8B90-384C6E1DBC63}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1A34A78B-F6AB-41A9-8B90-384C6E1DBC63}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1A34A78B-F6AB-41A9-8B90-384C6E1DBC63}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {24497E6A-D6B1-4C80-ABFB-57FFAD5070C4} = {96119B5E-B5F0-400A-9580-B342EBE26212} @@ -696,5 +702,6 @@ Global {F3ECE3DC-153D-4F8A-AC48-2396F2B46ED7} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210} {D01C99ED-B735-4854-A07D-B55E506A1441} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210} {4257ECCC-A172-4FC0-BE0D-A60FDE0C8A74} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210} + {1A34A78B-F6AB-41A9-8B90-384C6E1DBC63} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210} EndGlobalSection EndGlobal