Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added .Net 4.0 support #26

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/kafka-net.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<tags>C# Apache Kafka</tags>
</metadata>
<files>
<file src="lib\*" target="lib\net45" />
<file src="kafka-net40\bin\release\kafka-net*" target="lib\net40" />
<file src="kafka-net\bin\release\kafka-net*" target="lib\net45" />
</files>
</package>
</package>
12 changes: 12 additions & 0 deletions src/kafka-net.sln
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
..\README.md = ..\README.md
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-net40", "kafka-net40\kafka-net40.csproj", "{E8DBFB90-FAC3-4083-8116-74291B864443}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-tests40", "kafka-tests40\kafka-tests40.csproj", "{7A0D1379-A6E8-4F23-A3B5-56AD3121377D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -41,6 +45,14 @@ Global
{53E0B3CE-6C41-4C8A-8B66-9BD03667B1E0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{53E0B3CE-6C41-4C8A-8B66-9BD03667B1E0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{53E0B3CE-6C41-4C8A-8B66-9BD03667B1E0}.Release|Any CPU.Build.0 = Release|Any CPU
{E8DBFB90-FAC3-4083-8116-74291B864443}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E8DBFB90-FAC3-4083-8116-74291B864443}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E8DBFB90-FAC3-4083-8116-74291B864443}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E8DBFB90-FAC3-4083-8116-74291B864443}.Release|Any CPU.Build.0 = Release|Any CPU
{7A0D1379-A6E8-4F23-A3B5-56AD3121377D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7A0D1379-A6E8-4F23-A3B5-56AD3121377D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7A0D1379-A6E8-4F23-A3B5-56AD3121377D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7A0D1379-A6E8-4F23-A3B5-56AD3121377D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
4 changes: 4 additions & 0 deletions src/kafka-net/Common/BigEndianBinaryReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ public BigEndianBinaryReader(Stream input)
}

public BigEndianBinaryReader(Stream input, Boolean leaveOpen)
#if NET40
: base(input, Encoding.UTF8)
#else
: base(input, Encoding.UTF8, leaveOpen)
#endif
{
Contract.Requires(input != null);
}
Expand Down
4 changes: 4 additions & 0 deletions src/kafka-net/Common/BigEndianBinaryWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ public BigEndianBinaryWriter(Stream stream)
}

public BigEndianBinaryWriter(Stream stream, Boolean leaveOpen)
#if NET40
: base(stream, Encoding.UTF8)
#else
: base(stream, Encoding.UTF8, leaveOpen)
#endif
{
Contract.Requires(stream != null);
}
Expand Down
8 changes: 6 additions & 2 deletions src/kafka-net/Common/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,15 @@ public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationT

using (cancellationToken.Register(source => ((TaskCompletionSource<bool>)source).TrySetResult(true), tcs))
{
#if NET40
if (task != await TaskEx.WhenAny(task, tcs.Task))
#else
if (task != await Task.WhenAny(task, tcs.Task))
#endif
{
throw new OperationCanceledException(cancellationToken);
}
}
}
}

return await task;
}
Expand Down
13 changes: 11 additions & 2 deletions src/kafka-net/Common/ThreadWall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,12 @@ public void Release()
/// <returns>Task handle to signal passage allowed.</returns>
public Task RequestPassageAsync()
{
#if NET40
return AsTask(_semaphore.AvailableWaitHandle, new TimeSpan(0,0,0,0,-1));
#else
return AsTask(_semaphore.AvailableWaitHandle, Timeout.InfiniteTimeSpan);
}
#endif
}

private static Task AsTask(WaitHandle handle, TimeSpan timeout)
{
Expand All @@ -89,8 +93,13 @@ private static Task AsTask(WaitHandle handle, TimeSpan timeout)
else
localTcs.TrySetResult(null);
}, tcs, timeout, executeOnlyOnce: true);
#if NET40
tcs.Task.ContinueWith((_) => registration.Unregister(null), TaskScheduler.Default);
#else
tcs.Task.ContinueWith((_, state) => ((RegisteredWaitHandle)state).Unregister(null), registration, TaskScheduler.Default);
return tcs.Task;

#endif
return tcs.Task;
}
}
}
21 changes: 16 additions & 5 deletions src/kafka-net/KafkaTcpSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ public KafkaTcpSocket(IKafkaLog log, KafkaEndpoint endpoint, int delayConnectAtt
{
_log = log;
_endpoint = endpoint;
#if NET40
TaskEx.Delay(TimeSpan.FromMilliseconds(delayConnectAttemptMS)).ContinueWith(x => TriggerReconnection());
#else
Task.Delay(TimeSpan.FromMilliseconds(delayConnectAttemptMS)).ContinueWith(x => TriggerReconnection());
#endif
}

#region Interface Implementation...
Expand Down Expand Up @@ -127,8 +131,12 @@ private async Task<byte[]> EnsureReadAsync(int readSize, CancellationToken token
{
var cancelTaskToken = new CancellationTokenRegistration();
try
{
await _singleReaderSemaphore.WaitAsync(token);
{
#if NET40
await TaskEx.Run(()=>_singleReaderSemaphore.Wait(token));
#else
await _singleReaderSemaphore.WaitAsync(token);
#endif

var result = new List<byte>();
var bytesReceived = 0;
Expand Down Expand Up @@ -210,9 +218,12 @@ private async Task<TcpClient> ReEstablishConnectionAsync()
reconnectionDelay = reconnectionDelay * DefaultReconnectionTimeoutMultiplier;
_log.WarnFormat("Failed re-connection to:{0}. Will retry in:{1}", _endpoint, reconnectionDelay);
}

await Task.Delay(TimeSpan.FromMilliseconds(reconnectionDelay), _disposeToken.Token);
}
#if NET40
await TaskEx.Delay(TimeSpan.FromMilliseconds(reconnectionDelay), _disposeToken.Token);
#else
await Task.Delay(TimeSpan.FromMilliseconds(reconnectionDelay), _disposeToken.Token);
#endif
}

return _client;
}
Expand Down
7 changes: 6 additions & 1 deletion src/kafka-net/MetadataQueries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,14 @@ public Task<List<OffsetResponse>> GetTopicOffsetAsync(string topic, int maxOffse
return route.Connection.SendAsync(request);
}).ToArray();

#if NET40
return TaskEx.WhenAll(sendRequests)
.ContinueWith(t => sendRequests.SelectMany(x => x.Result).ToList());
#else
return Task.WhenAll(sendRequests)
.ContinueWith(t => sendRequests.SelectMany(x => x.Result).ToList());
}
#endif
}

/// <summary>
/// Get metadata on the given topic.
Expand Down
9 changes: 6 additions & 3 deletions src/kafka-net/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,12 @@ into routes
};

sendTasks.Add(route.Key.Connection.SendAsync(request));
}

await Task.WhenAll(sendTasks.ToArray());
}
#if NET40
await TaskEx.WhenAll(sendTasks.ToArray());
#else
await Task.WhenAll(sendTasks.ToArray());
#endif

return sendTasks.SelectMany(t => t.Result).ToList();
}
Expand Down
15 changes: 15 additions & 0 deletions src/kafka-net40/app.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<runtime>
<assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
<dependentAssembly>
<assemblyIdentity name="System.Runtime" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-2.6.8.0" newVersion="2.6.8.0" />
</dependentAssembly>
<dependentAssembly>
<assemblyIdentity name="System.Threading.Tasks" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-2.6.8.0" newVersion="2.6.8.0" />
</dependentAssembly>
</assemblyBinding>
</runtime>
</configuration>
Loading