Skip to content

Releases: Jroland/kafka-net

Performance Improvement Release

29 May 17:11
Compare
Choose a tag to compare

The focus of this release was to improve throughput of the driver. This version should see some large gains in high volume throughput as it balances the buffers better, enabling the Tcp threads to always have data on deck for the next operation. It also includes several bug fixes.

  • Fix #60 AsyncCollection not exiting tight loop when timeout expires.
  • AsyncCollection updated to use ConcurrentQueue and reduced locking.
    • The old AsyncCollection turned out to be a significant bottleneck when under load.
  • Modified statistics tracker code to reduce the overhead of tracking data flow.
  • Change Gzip to use fastest compression and close streams.
    • This change increases throughput by a larger amount then you would expect, while only losing a few percent in compression level.
  • Fix #57 correctly handle consumer max wait timeout when over int max size.
  • General code cleanup.

Statistics Tracking and Faster Socket

30 Apr 03:40
Compare
Choose a tag to compare

Rewrite KafkaTcpSocket

The KafkaTcpSocket has been re-written (again) to reserve one thread per broker. The reads and writes are asynchronously performed, but one thread is essentially reserved to quickly handle the return states of each async call. The old version had a lot of Thread contention and it was difficult to ensure high throughput when the system was busy with other tasks.

The rewrite was also in response to a bug found by: Alex #50
The new changes partially covers the problem by doing the following.

Driver blocks produce requests from going out while it attempts to get a connection to its broker. Any new produce request coming in will be queued for sending, but the timeout will not start. Once the connection to a broker has been made, the produce request will be unblocked and the timeout will start for the send request. If an ack is requested, the timeout will start again for the ack request wait portion once the send request operation has completed.

This covers the case where produce requests would silently timeout while being blocked by the driver to connect to the broker. It does not cover the case however for lost connections to brokers and new leader elections. The plan to solve this will be to pass up a server disconnected event and trigger metadata query redirect connections to the new leader. Issue #17 covers this.

Statistics Heartbeat

An initial roll out of statistics tracking has been added to the driver. The driver has a heartbeat which tracks internal driver traffic over a period of time. The snapshots are taken every 5 seconds and the data is stored within a summarize capped collection of data. Because of this capped collection, all averages are calculated over a moving window of time.

These stats are definitely a work in progress and will be changing a lot. However they are extremely useful for tracking down bottlenecks in code and to see the progress of producers. Here is some sample code. Please note, the event is located on a static object called StatisticsTracker and this is done on purpose to externalize the testing of this feature, and will be moving to a proper place in the future.

StatisticsTracker.OnStatisticsHeartbeat += WriteStatisticsToConsole;

void StatisticsTracker_OnStatisticsHeartbeat(StatisticsSummary stats)
        {
            Console.WriteLine("Producer: Buffer: {0} AsyncQueued: {1}", _producer.BufferCount, _producer.AsyncCount);
            Console.WriteLine("Produced: Msgs: {0} New/s: {1}  MsgKilobytes/s: {2} PayloadKiloytes/s: {3} CompressionRatio: {4}",
                stats.ProduceRequestSummary.MessageCount,
                stats.ProduceRequestSummary.MessagesPerSecond,
                stats.ProduceRequestSummary.MessageKilobytesPerSecond,
                stats.ProduceRequestSummary.PayloadKilobytesPerSecond,
                stats.ProduceRequestSummary.AverageCompressionRatio);

            stats.NetworkWriteSummaries.ForEach(s =>
            {
                Console.WriteLine("Endpoint: {0}", s.Endpoint);
                if (s.QueueSummary != null)
                {
                    Console.WriteLine("Q = Messages: {0}, Q Kilobytes: {1}, OldestInQueue:{2},  BatchCount: {3}",
                        s.QueueSummary.QueuedMessages,
                        s.QueueSummary.KilobytesQueued, 
                        s.QueueSummary.OldestBatchInQueue.TotalMilliseconds,
                        s.QueueSummary.QueuedBatchCount);
                }

                if (s.TcpSummary != null)
                {
                    Console.WriteLine("C = Msg/s: {0},  Last: {1},  Kilobytes/s: {2}, AvgTcpMS:{3} AvgTotalMS: {4} Async: {5}",
                        s.TcpSummary.MessagesPerSecond,
                        s.TcpSummary.MessagesLastBatch,
                        s.TcpSummary.KilobytesPerSecond,
                        s.TcpSummary.AverageWriteDuration.TotalMilliseconds,
                        s.TcpSummary.AverageTotalDuration.TotalMilliseconds,
                        stats.Gauges.ActiveWriteOperation);
                }
            });

            Console.WriteLine("Upload Rate: Msg/s: {0}  Kilobytes/s: {1}  Max Msg/s: {2}  Last Batch: {3}", 
                stats.NetworkWriteSummaries.Where(x => x.TcpSummary != null).Sum(x => x.TcpSummary.MessagesPerSecond),
                stats.NetworkWriteSummaries.Where(x => x.TcpSummary != null).Sum(x => x.TcpSummary.KilobytesPerSecond),
                stats.NetworkWriteSummaries.Where(x => x.TcpSummary != null).Sum(x => x.TcpSummary.MaxMessagesPerSecond),
                stats.NetworkWriteSummaries.Where(x => x.TcpSummary != null).Sum(x => x.TcpSummary.MessagesLastBatch));

            Console.WriteLine("");
        }

Bug Fixes

10 Apr 23:35
Compare
Choose a tag to compare

#51 Fix max buffer not property blocking producer when hit.
#50 Fix ResponseTimeout triggered when SendAsync is waiting on connection.
#49 Fix AsyncLock throwing exception when cancelled.
#53 ExpectResponse should be true when requesting all replicas.

Note: Reverted some changes in #50 which caused thread contentions.

Threading bug fix release

05 Apr 06:47
Compare
Choose a tag to compare

kafka-net Release Notes

Version 0.9.0.14

Fix memory leak in NagleBlockingCollection.
Timeout does not reset when new data is added.
Fix thread contention when producer has many threads loading data into it's buffer.
Fix many deadlock senarios on cancelling and disposing.
More unit tests around threading.

Nagle Producer Release

01 Apr 18:51
Compare
Choose a tag to compare

kafka-net Release Notes

Version 0.9.0.1

Feature: Nagle Producer

The producer class has been significantly updated to use a message batching technique similar to the nagle algorithm.

The producer accepts messages and groups them together into a single batched transmission. The total number of messages to batch before sending and the maximum amount of time to wait for the max batch size, is configurable. Tunning these two parameters, along with gzip compression can increase the driver throughput by orders of magnitude.

var producer = new Producer(new BrokerRouter(options)) 
    { 
        BatchSize = 100,
        BatchDelayTime = TimeSpan.FromMilliseconds(100)
    };

// BatchSize - The producer will wait until it receives 100 messages, group them together into one request and send.
// BatchDelayTime - If the producer has not received 100 messages within 100 milliseconds, the producer will send what it has received.

Feature: Memory management

The producer now has better options for managing how much memory it consumes when it starts to get backed up.

There are now two parameters on the producer constructor:
MaximumAsyncRequests
MaximumMessageBuffer

These two parameters prevents the producer from going over a maximum of resources used in terms of network and memory.

MaximumMessageBuffer

This parameter represents the maximum number of messages the producer will hold in its buffer at any one time. This includes all in flight messages and those buffered by the batching mechanism. This maximum will be hit if more messages arrive to the producer than it can send to Kafka. When the maximum is reached, the producer will block on any new messages until space is available.

MaximumAsyncRequests

This parameter represents the maximum number of queued up async TCP commands allowed in flight at any one time. This can occur when the batch size is too low and the producer creates a high number of transmission requests out to the Kafka brokers. Having thousands of queued up async messages can adversly affect memory and increase timeout errors.

var producer = new Producer(new BrokerRouter(options), maximumAsyncRequests: 30, maximumMessageBuffer:1000);

//maximum outbound async requests will be limited to 30
//maximum amount of messages in the producer at any one time will be limited to 1000

Issues/Features Summary

  • Fix some integration tests to run on any Kafka configuration. More need conversion.
  • Redesign of TcpKafkaSockets
    • Performance improvements
    • Remove several deadlock senarios
    • Remove several race conditions
  • Nagle producer
    • Memory management
    • Significant performance improvement
  • Add MaximumReconnectionTimeout
    • Put a maximum amount of time to wait when backing off
  • Update documentation in code
  • Update/extend unit tests