Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pr/check destination null #1093

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/NetMQ.Tests/ClientServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void Tcp()
}

[Fact]
public async void Async()
public async Task Async()
{
using var server = new ServerSocket();
using var client = new ClientSocket();
Expand All @@ -72,7 +72,7 @@ public async void Async()
}

[Fact]
public async void AsyncWithCancellationToken()
public async Task AsyncWithCancellationToken()
{
using CancellationTokenSource source = new CancellationTokenSource();
using var server = new ServerSocket();
Expand All @@ -85,7 +85,7 @@ public async void AsyncWithCancellationToken()
#if NETCOREAPP3_1

[Fact(Timeout = 120)]
public async void AsyncEnumerableCanceled()
public async Task AsyncEnumerableCanceled()
{
using CancellationTokenSource source = new CancellationTokenSource();
using var server = new ServerSocket();
Expand Down
2 changes: 1 addition & 1 deletion src/NetMQ.Tests/MessageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void Issue52_ReqToRouterBug()

var msg = router.ReceiveMultipartMessage();
Assert.Equal(3, msg.FrameCount);
Assert.Equal(msg[2].ConvertToString(), testmessage);
Assert.Equal(testmessage, msg[2].ConvertToString());
}
}

Expand Down
26 changes: 20 additions & 6 deletions src/NetMQ.Tests/NetMQ.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<!-- have to teachcd MSBuild where the Mono copy of the reference asssemblies is -->
<TargetIsMono Condition="$(TargetFramework.StartsWith('net4')) and '$(OS)' == 'Unix'">true</TargetIsMono>

<!-- Look in the standard install locations -->
<!-- Look in the standard install locations -->
<BaseFrameworkPathOverrideForMono Condition="'$(BaseFrameworkPathOverrideForMono)' == '' AND '$(TargetIsMono)' == 'true' AND EXISTS('/Library/Frameworks/Mono.framework/Versions/Current/lib/mono')">/Library/Frameworks/Mono.framework/Versions/Current/lib/mono</BaseFrameworkPathOverrideForMono>
<BaseFrameworkPathOverrideForMono Condition="'$(BaseFrameworkPathOverrideForMono)' == '' AND '$(TargetIsMono)' == 'true' AND EXISTS('/usr/lib/mono')">/usr/lib/mono</BaseFrameworkPathOverrideForMono>
<BaseFrameworkPathOverrideForMono Condition="'$(BaseFrameworkPathOverrideForMono)' == '' AND '$(TargetIsMono)' == 'true' AND EXISTS('/usr/local/lib/mono')">/usr/local/lib/mono</BaseFrameworkPathOverrideForMono>
Expand All @@ -39,14 +39,28 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.5.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="xunit" Version="2.4.2-pre.13" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'net47' ">
<PackageReference Include="ZeroMQ" Version="4.1.0.31" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' != 'netcoreapp3.1' ">
<PackageReference Include="xunit" Version="2.8.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netcoreapp3.1' ">
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

</Project>
4 changes: 2 additions & 2 deletions src/NetMQ.Tests/NetMQMonitorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void StartAsync()
Thread.Sleep(200);
Assert.Equal(TaskStatus.Running, task.Status);
monitor.Stop();
Assert.True(task.Wait(TimeSpan.FromMilliseconds(1000)));
Assert.True(TaskUtils.Wait(task, TimeSpan.FromMilliseconds(1000)));
}
}
#endif
Expand Down Expand Up @@ -154,7 +154,7 @@ public void MonitorDisposeProperlyWhenDisposedAfterMonitoredTcpSocket()
}
Thread.Sleep(100);
// Monitor.Dispose should complete
var completed = Task.Factory.StartNew(() => monitor.Dispose()).Wait(1000);
var completed = TaskUtils.Wait(Task.Factory.StartNew(() => monitor.Dispose()), TimeSpan.FromMilliseconds(1000));
Assert.True(completed);
}
// NOTE If this test fails, it will hang because context.Dispose will block
Expand Down
18 changes: 9 additions & 9 deletions src/NetMQ.Tests/NetMQPollerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public void RemoveSocket()

poller.Stop();
// await the pollerTask, 1ms should suffice
pollerTask.Wait(1);
TaskUtils.Wait(pollerTask, TimeSpan.FromMilliseconds(1));
Assert.True(pollerTask.IsCompleted);
}
}
Expand Down Expand Up @@ -879,7 +879,7 @@ public void OneTask()
Assert.True(poller.CanExecuteTaskInline, "Should be on NetMQPoller thread");
});
task.Start(poller);
task.Wait();
TaskUtils.Wait(task);

Assert.True(triggered);
}
Expand All @@ -894,7 +894,7 @@ public void SetsCurrentTaskScheduler()

var task = new Task(() => Assert.Same(TaskScheduler.Current, poller));
task.Start(poller);
task.Wait();
TaskUtils.Wait(task);
}
}

Expand All @@ -911,7 +911,7 @@ public void CanExecuteTaskInline()

var task = new Task(() => Assert.True(poller.CanExecuteTaskInline));
task.Start(poller);
task.Wait();
TaskUtils.Wait(task);
}
}

Expand Down Expand Up @@ -941,8 +941,8 @@ public void ContinueWith()
}, poller);

task.Start(poller);
task.Wait();
task2.Wait();
TaskUtils.Wait(task);
TaskUtils.Wait(task2);

Assert.Equal(threadId1, threadId2);
Assert.Equal(1, runCount1);
Expand Down Expand Up @@ -982,9 +982,9 @@ public void TwoThreads()
}
});

t1.Wait(1000);
t2.Wait(1000);
Task.WaitAll(allTasks.ToArray(), 1000);
TaskUtils.Wait(t1, TimeSpan.FromMilliseconds(1000));
TaskUtils.Wait(t2, TimeSpan.FromMilliseconds(1000));
TaskUtils.WaitAll(allTasks, TimeSpan.FromMilliseconds(1000));

Assert.Equal(100, count1);
Assert.Equal(100, count2);
Expand Down
2 changes: 1 addition & 1 deletion src/NetMQ.Tests/NetMQQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void EnqueueShouldNotBlockWhenCapacityIsZero()
}
});

bool completed = task.Wait(TimeSpan.FromSeconds(1));
bool completed = TaskUtils.Wait(task, TimeSpan.FromSeconds(1));
Assert.True(completed, "Enqueue task should have completed " + socketWatermarkCapacity + " enqueue within 1 second");
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/NetMQ.Tests/PgmTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ public void Sending1000Messages()
}
});

pubTask.Wait();
subTask.Wait();
TaskUtils.Wait(pubTask);
TaskUtils.Wait(subTask);

Assert.Equal(1000, count);
}
Expand Down Expand Up @@ -291,7 +291,7 @@ public void SubscriberCleanupOnUnbind(string address)

monitor.Stop();

monitorTask.Wait();
TaskUtils.Wait(monitorTask);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/NetMQ.Tests/RadioDish.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ.Sockets;
using Xunit;
using Xunit.Abstractions;
Expand Down Expand Up @@ -47,7 +48,7 @@ public void TestBlocking()
}

[Fact]
public async void TestAsync()
public async Task TestAsync()
{
using var radio = new RadioSocket();
using var dish = new DishSocket();
Expand Down
2 changes: 1 addition & 1 deletion src/NetMQ.Tests/RouterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void ReceiveReadyDot35Bug()
using (var server = new RouterSocket())
{
server.BindRandomPort("tcp://127.0.0.1");
server.ReceiveReady += (s, e) => Assert.True(false, "Should not receive");
server.ReceiveReady += (s, e) => Assert.Fail("Should not receive");

Assert.False(server.Poll(TimeSpan.FromMilliseconds(1500)));
}
Expand Down
3 changes: 2 additions & 1 deletion src/NetMQ.Tests/ScatterGather.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading;
using System.Threading.Tasks;
using NetMQ.Sockets;
using Xunit;

Expand Down Expand Up @@ -47,7 +48,7 @@ public void TestBlocking()
}

[Fact]
public async void TestAsync()
public async Task TestAsync()
{
using var scatter = new ScatterSocket();
using var gather = new GatherSocket();
Expand Down
2 changes: 1 addition & 1 deletion src/NetMQ.Tests/SocketTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void ReceiveMessageWithTimeout()
t1.Start();
t2.Start();

Task.WaitAll(t1, t2);
TaskUtils.WaitAll(new[]{t1, t2});
}
}

Expand Down
56 changes: 56 additions & 0 deletions src/NetMQ.Tests/TaskUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace NetMQ.Tests
{
internal class TaskUtils
{
internal static async Task PollUntil(Func<bool> condition, TimeSpan timeout)
{
var cts = new CancellationTokenSource();
cts.CancelAfter(timeout);

await PollUntil(condition, cts.Token);
}

internal static async Task PollUntil(Func<bool> condition, CancellationToken ct = default)
{
try
{
while (!condition())
{
await Task.Delay(25, ct).ConfigureAwait(true);
}
}
catch (TaskCanceledException)
{
// Task was cancelled. Ignore exception and return.
}
}

internal static bool WaitAll(IEnumerable<Task> tasks, TimeSpan timeout)
{
PollUntil(() => tasks.All(t => t.IsCompleted), timeout).Wait();
return tasks.All(t => t.Status == TaskStatus.RanToCompletion);
}

internal static void WaitAll(IEnumerable<Task> tasks)
{
PollUntil(() => tasks.All(t => t.IsCompleted), Timeout.InfiniteTimeSpan).Wait();
}

internal static bool Wait(Task task, TimeSpan timeout)
{
PollUntil(() => task.IsCompleted, timeout).Wait();
return task.Status == TaskStatus.RanToCompletion;
}

internal static void Wait(Task task)
{
PollUntil(() => task.IsCompleted, Timeout.InfiniteTimeSpan).Wait();
}
}
}
6 changes: 3 additions & 3 deletions src/NetMQ.Tests/XPubSubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public void Manual()
sub.SendFrame(new byte[] { 1, (byte)'A' });
var subscription = pub.ReceiveFrameBytes();

Assert.Equal(subscription[1], (byte)'A');
Assert.Equal((byte)'A', subscription[1]);

pub.Subscribe("B");
pub.SendFrame("A");
Expand All @@ -356,7 +356,7 @@ public void WelcomeMessage()

var subscription = pub.ReceiveFrameBytes();

Assert.Equal(subscription[1], (byte)'W');
Assert.Equal((byte)'W', subscription[1]);

Assert.Equal("W", sub.ReceiveFrameString());
}
Expand All @@ -377,7 +377,7 @@ public void ClearWelcomeMessage()

var subscription = pub.ReceiveFrameBytes();

Assert.Equal(subscription[1], (byte)'W');
Assert.Equal((byte)'W', subscription[1]);

Assert.False(sub.TrySkipFrame());
}
Expand Down
10 changes: 3 additions & 7 deletions src/NetMQ/Core/Mailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,9 @@ public bool TryRecv(int timeout, out Command command)
return false;
}

// We've got the signal. Now we can switch into active state.
m_active = true;

// Get a command.
var ok = m_commandPipe.TryRead(out command);
Debug.Assert(ok);
return ok;
// We've got the signal. Now we can switch into active state if we can read.
m_active = m_commandPipe.TryRead(out command);
return m_active;
}

/// <summary>
Expand Down
3 changes: 1 addition & 2 deletions src/NetMQ/Core/SocketBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1261,8 +1261,7 @@ private void ProcessCommands(int timeout, bool throttle, CancellationToken cance
// Process all the commands available at the moment.
while (found)
{
Assumes.NotNull(command.Destination);
command.Destination.ProcessCommand(command);
command.Destination?.ProcessCommand(command);
found = m_mailbox.TryRecv(0, out command);
}

Expand Down
22 changes: 15 additions & 7 deletions src/NetMQ/Core/YPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,25 @@ public bool CheckRead()
/// <returns><c>true</c> if the read succeeded, otherwise <c>false</c>.</returns>
public bool TryRead([MaybeNullWhen(returnValue: false)] out T value)
{
// Try to prefetch a value.
if (!CheckRead())
try
{
// Try to prefetch a value.
if (!CheckRead())
{
value = default(T);
return false;
}

// There was at least one value prefetched.
// Return it to the caller.
value = m_queue.Pop();
return true;
}
catch
{
value = default(T);
return false;
}

// There was at least one value prefetched.
// Return it to the caller.
value = m_queue.Pop();
return true;
}

/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions src/NetMQ/NetMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' ">
<PackageReference Include="System.ServiceModel.Primitives" Version="4.9.0" />
<PackageReference Include="System.ServiceModel.Primitives" Version="4.10.3" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard2.1' ">
<PackageReference Include="System.ServiceModel.Primitives" Version="4.9.0" />
<PackageReference Include="System.ServiceModel.Primitives" Version="4.10.3" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFrameworkIdentifier)' == '.NETFramework' ">
Expand Down
Loading