Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High level consumer with non-blocking feature #23

Open
wants to merge 15 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ publish/
# NuGet Packages Directory
## TODO: If you have NuGet Package Restore enabled, uncomment the next line
packages/
*.nupkg

# Windows Azure Build Output
csx
Expand Down
22 changes: 18 additions & 4 deletions src/TestHarness/TestHarness.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@
<RootNamespace>TestHarness</RootNamespace>
<AssemblyName>TestHarness</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<FileAlignment>4096</FileAlignment>
<SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir>
<RestorePackages>true</RestorePackages>
<NoWin32Manifest>False</NoWin32Manifest>
<AllowUnsafeBlocks>False</AllowUnsafeBlocks>
<NoStdLib>False</NoStdLib>
<TreatWarningsAsErrors>False</TreatWarningsAsErrors>
<IntermediateOutputPath>obj\$(Configuration)\</IntermediateOutputPath>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<PlatformTarget>x86</PlatformTarget>
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<DebugType>Full</DebugType>
<Optimize>False</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
Expand All @@ -33,6 +38,15 @@
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
<CheckForOverflowUnderflow>False</CheckForOverflowUnderflow>
<BaseIntermediateOutputPath>obj\</BaseIntermediateOutputPath>
</PropertyGroup>
<PropertyGroup Condition=" '$(Platform)' == 'AnyCPU' ">
<BaseAddress>4194304</BaseAddress>
<RegisterForComInterop>False</RegisterForComInterop>
<GenerateSerializationAssemblies>Auto</GenerateSerializationAssemblies>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Core" />
Expand Down
2 changes: 1 addition & 1 deletion src/kafka-net.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@
<tags>C# Apache Kafka</tags>
</metadata>
<files>
<file src="lib\*" target="lib\net45" />
<file src="kafka-net\lib\*" target="kafka-net\lib\net45" />
</files>
</package>
15 changes: 8 additions & 7 deletions src/kafka-net.sln
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 2013
# Visual Studio 2012
# SharpDevelop 4.4
VisualStudioVersion = 12.0.30723.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-net", "kafka-net\kafka-net.csproj", "{1343EB68-55CB-4452-8386-24A9989DE1C0}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-tests", "kafka-tests\kafka-tests.csproj", "{D80AE407-BB81-4C11-BFDC-5DD463F8B1BF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestHarness", "TestHarness\TestHarness.csproj", "{53E0B3CE-6C41-4C8A-8B66-9BD03667B1E0}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{106F20D4-E22F-4C73-9D48-7F38E2A77163}"
ProjectSection(SolutionItems) = preProject
.nuget\NuGet.Config = .nuget\NuGet.Config
Expand All @@ -23,6 +18,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
..\README.md = ..\README.md
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-net", "kafka-net\kafka-net.csproj", "{1343EB68-55CB-4452-8386-24A9989DE1C0}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-tests", "kafka-tests\kafka-tests.csproj", "{D80AE407-BB81-4C11-BFDC-5DD463F8B1BF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestHarness", "TestHarness\TestHarness.csproj", "{53E0B3CE-6C41-4C8A-8B66-9BD03667B1E0}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down
6 changes: 6 additions & 0 deletions src/kafka-net/Common/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,11 @@ public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationT

return await task;
}

public static void times(this int n, Action action){
for (int i = 0; i < n; i++) {
action.Invoke();
}
}
}
}
29 changes: 14 additions & 15 deletions src/kafka-net/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ namespace KafkaNet
/// </summary>
public class Consumer : IMetadataQueries, IDisposable
{
private readonly ConsumerOptions _options;
private readonly BlockingCollection<Message> _fetchResponseQueue;
private readonly CancellationTokenSource _disposeToken = new CancellationTokenSource();
private readonly ConcurrentDictionary<int, Task> _partitionPollingIndex = new ConcurrentDictionary<int, Task>();
private readonly ConcurrentDictionary<int, long> _partitionOffsetIndex = new ConcurrentDictionary<int, long>();
private readonly IScheduledTimer _topicPartitionQueryTimer;
private readonly IMetadataQueries _metadataQueries;

private int _disposeCount;
private int _ensureOneThread;
private Topic _topic;
protected readonly ConsumerOptions _options;
protected readonly BlockingCollection<Message> _fetchResponseQueue;
protected readonly CancellationTokenSource _disposeToken = new CancellationTokenSource();
protected readonly ConcurrentDictionary<int, Task> _partitionPollingIndex = new ConcurrentDictionary<int, Task>();
protected readonly ConcurrentDictionary<int, long> _partitionOffsetIndex = new ConcurrentDictionary<int, long>();
protected readonly IScheduledTimer _topicPartitionQueryTimer;
protected readonly IMetadataQueries _metadataQueries;

protected int _disposeCount;
protected int _ensureOneThread;
protected Topic _topic;

public Consumer(ConsumerOptions options, params OffsetPosition[] positions)
{
Expand Down Expand Up @@ -87,7 +87,7 @@ public List<OffsetPosition> GetOffsetPosition()
return _partitionOffsetIndex.Select(x => new OffsetPosition { PartitionId = x.Key, Offset = x.Value }).ToList();
}

private void RefreshTopicPartitions()
protected void RefreshTopicPartitions()
{
try
{
Expand Down Expand Up @@ -143,7 +143,8 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId)
{
Topic = topic,
PartitionId = partitionId,
Offset = offset
Offset = offset,
MaxBytes = _options.MaxMessageSize
}
};

Expand All @@ -164,13 +165,11 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId)
foreach (var message in response.Messages)
{
_fetchResponseQueue.Add(message, _disposeToken.Token);

if (_disposeToken.IsCancellationRequested) return;
}

var nextOffset = response.Messages.Max(x => x.Meta.Offset) + 1;
_partitionOffsetIndex.AddOrUpdate(partitionId, i => nextOffset, (i, l) => nextOffset);

// sleep is not needed if responses were received
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion src/kafka-net/Interfaces/IMetadataQueries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace KafkaNet
/// <summary>
/// Contains common metadata query commands that are used by both a consumer and producer.
/// </summary>
interface IMetadataQueries : IDisposable
public interface IMetadataQueries : IDisposable
{
/// <summary>
/// Get metadata on the given topic.
Expand Down
5 changes: 4 additions & 1 deletion src/kafka-net/Model/ConsumerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class ConsumerOptions
{
private const int DefaultMaxConsumerBufferSize = 50;
private const int DefaultBackoffIntervalMS = 1000;

private const int DefaultMaxMsgSize = 4096 * 1024; // default to be 4 MB for max message size.
/// <summary>
/// The topic to consume messages from.
/// </summary>
Expand Down Expand Up @@ -37,6 +37,8 @@ public class ConsumerOptions
/// </summary>
public TimeSpan BackoffInterval { get; set; }

public int MaxMessageSize { get; set; }

public ConsumerOptions(string topic, IBrokerRouter router)
{
Topic = topic;
Expand All @@ -46,6 +48,7 @@ public ConsumerOptions(string topic, IBrokerRouter router)
TopicPartitionQueryTimeMs = (int)TimeSpan.FromMinutes(15).TotalMilliseconds;
ConsumerBufferSize = DefaultMaxConsumerBufferSize;
BackoffInterval = TimeSpan.FromMilliseconds(DefaultBackoffIntervalMS);
MaxMessageSize = DefaultMaxMsgSize;
}
}
}
159 changes: 159 additions & 0 deletions src/kafka-net/NativeHLConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Created by SharpDevelop.
* User: peng.zang
* Date: 11/11/2014
* Time: 10:49 AM
*
* To change this template use Tools | Options | Coding | Edit Standard Headers.
*/
using System;
using System.Linq;
using System.Threading;
using System.Collections.Generic;

using KafkaNet.Protocol;
using KafkaNet.Model;
using KafkaNet.Common;

namespace KafkaNet
{
/// <summary>
/// A High level API with consumer group support. Automatic commits the offset for the group, and will return a non-blocking
/// message list to client.
/// TODO: Make sure offset tracking works in parallel (right now it will consume in a "at least once" manner)
/// </summary>
public class NativeHLConsumer : Consumer
{

protected string _consumerGroup;

public NativeHLConsumer(ConsumerOptions options, string consumerGroup, params OffsetPosition[] positions)
: base(options, positions)
{
if (_topic == null || _topic.Name != _options.Topic)
_topic = _metadataQueries.GetTopic(_options.Topic);
_consumerGroup = consumerGroup;
RefreshOffsets();
}

/// <summary>
/// Refresh offset by fetching the offset from kafka server for this._consumerGroup; also check if the offset is within the range of
/// min-max offset in current topic, if not, set to minimum offset.
/// </summary>
public void RefreshOffsets()
{
var actualOffsets = _metadataQueries.GetTopicOffsetAsync(_options.Topic).Result;
var maxminGroups = actualOffsets.Select(x => new { pid = x.PartitionId, min = x.Offsets.Min(), max = x.Offsets.Max() });

_topic.Partitions.ForEach(
partition =>
{
_options.Router.SelectBrokerRoute(_topic.Name, partition.PartitionId).Connection
.SendAsync(CreateOffsetFetchRequest(_consumerGroup, partition.PartitionId))
.Result.ForEach(
offsetResp =>
{
Console.WriteLine("fetch offset: " + offsetResp.ToString());

if (actualOffsets.Any(x => x.PartitionId == partition.PartitionId))
{
var actual = maxminGroups.First(x => x.pid == partition.PartitionId);
if (actual.min > offsetResp.Offset || actual.max < offsetResp.Offset)
{
offsetResp.Offset = actual.min;
}
}
_partitionOffsetIndex.AddOrUpdate(partition.PartitionId, i => offsetResp.Offset, (i, l) => offsetResp.Offset);
});
}
);

}

/// <summary>
/// One time consuming certain num of messages specified, and stop consuming more at the end of call. It'll automatically increase
/// the offset by num and commit it. If fail to commit offset, it'll return null result.
/// </summary>
/// <param name="num"></param>
/// <returns></returns>
public IEnumerable<Message> Consume(int num, int timeout=1000)
{
List<Message> result = new List<Message>();

_options.Log.DebugFormat("Consumer: Beginning consumption of topic: {0}", _options.Topic);
_topicPartitionQueryTimer.Begin();

while (result.Count < num) {
Message temp = null;
if(!_fetchResponseQueue.TryTake(out temp, timeout)){
return null;
}

if(temp != null){
var conn = _options.Router.SelectBrokerRoute(_topic.Name, temp.Meta.PartitionId).Connection;
var offsets = conn.SendAsync(CreateOffsetFetchRequest(_consumerGroup, temp.Meta.PartitionId )).Result;
var x = offsets.FirstOrDefault();

if(x != null && x.PartitionId == temp.Meta.PartitionId){
if(x.Offset > temp.Meta.Offset)
_options.Log.DebugFormat("GET Duplicated message");
else {
if(CommitOffset(conn, temp.Meta.PartitionId, temp.Meta.Offset+1))
result.Add(temp);
}
}
}
}
return result;
}

protected bool CommitOffset(IKafkaConnection conn, int pid, long offset)
{
var resp = conn.SendAsync(CreateOffsetCommitRequest(_consumerGroup, pid, offset)).Result.FirstOrDefault();
if (resp != null && ((int)resp.Error) == (int)ErrorResponseCode.NoError)
return true;
else
{
return false;
}
}

protected OffsetFetchRequest CreateOffsetFetchRequest(string consumerGroup, int partitionId)
{
var request = new OffsetFetchRequest
{
ConsumerGroup = consumerGroup,
Topics = new List<OffsetFetch>
{
new OffsetFetch
{
PartitionId = partitionId,
Topic = _options.Topic
}
}
};

return request;
}

protected OffsetCommitRequest CreateOffsetCommitRequest(string consumerGroup, int partitionId, long offset, string metadata = null)
{
var commit = new OffsetCommitRequest
{
ConsumerGroup = consumerGroup,
OffsetCommits = new List<OffsetCommit>
{
new OffsetCommit
{
PartitionId = partitionId,
Topic = _topic.Name,
Offset = offset,
Metadata = metadata
}
}
};

return commit;
}
}
}
3 changes: 3 additions & 0 deletions src/kafka-net/Protocol/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public static IEnumerable<Message> DecodeMessage(long offset, byte[] payload)
throw new NotSupportedException(string.Format("Codec type of {0} is not supported.", codec));
}
}

}

/// <summary>
Expand All @@ -200,5 +201,7 @@ public class MessageMetadata
/// The partition id this offset is from.
/// </summary>
public int PartitionId { get; set; }


}
}
Loading