Skip to content

Statistics Tracking and Faster Socket

Compare
Choose a tag to compare
@Jroland Jroland released this 30 Apr 03:40
· 18 commits to master since this release

Rewrite KafkaTcpSocket

The KafkaTcpSocket has been re-written (again) to reserve one thread per broker. The reads and writes are asynchronously performed, but one thread is essentially reserved to quickly handle the return states of each async call. The old version had a lot of Thread contention and it was difficult to ensure high throughput when the system was busy with other tasks.

The rewrite was also in response to a bug found by: Alex #50
The new changes partially covers the problem by doing the following.

Driver blocks produce requests from going out while it attempts to get a connection to its broker. Any new produce request coming in will be queued for sending, but the timeout will not start. Once the connection to a broker has been made, the produce request will be unblocked and the timeout will start for the send request. If an ack is requested, the timeout will start again for the ack request wait portion once the send request operation has completed.

This covers the case where produce requests would silently timeout while being blocked by the driver to connect to the broker. It does not cover the case however for lost connections to brokers and new leader elections. The plan to solve this will be to pass up a server disconnected event and trigger metadata query redirect connections to the new leader. Issue #17 covers this.

Statistics Heartbeat

An initial roll out of statistics tracking has been added to the driver. The driver has a heartbeat which tracks internal driver traffic over a period of time. The snapshots are taken every 5 seconds and the data is stored within a summarize capped collection of data. Because of this capped collection, all averages are calculated over a moving window of time.

These stats are definitely a work in progress and will be changing a lot. However they are extremely useful for tracking down bottlenecks in code and to see the progress of producers. Here is some sample code. Please note, the event is located on a static object called StatisticsTracker and this is done on purpose to externalize the testing of this feature, and will be moving to a proper place in the future.

StatisticsTracker.OnStatisticsHeartbeat += WriteStatisticsToConsole;

void StatisticsTracker_OnStatisticsHeartbeat(StatisticsSummary stats)
        {
            Console.WriteLine("Producer: Buffer: {0} AsyncQueued: {1}", _producer.BufferCount, _producer.AsyncCount);
            Console.WriteLine("Produced: Msgs: {0} New/s: {1}  MsgKilobytes/s: {2} PayloadKiloytes/s: {3} CompressionRatio: {4}",
                stats.ProduceRequestSummary.MessageCount,
                stats.ProduceRequestSummary.MessagesPerSecond,
                stats.ProduceRequestSummary.MessageKilobytesPerSecond,
                stats.ProduceRequestSummary.PayloadKilobytesPerSecond,
                stats.ProduceRequestSummary.AverageCompressionRatio);

            stats.NetworkWriteSummaries.ForEach(s =>
            {
                Console.WriteLine("Endpoint: {0}", s.Endpoint);
                if (s.QueueSummary != null)
                {
                    Console.WriteLine("Q = Messages: {0}, Q Kilobytes: {1}, OldestInQueue:{2},  BatchCount: {3}",
                        s.QueueSummary.QueuedMessages,
                        s.QueueSummary.KilobytesQueued, 
                        s.QueueSummary.OldestBatchInQueue.TotalMilliseconds,
                        s.QueueSummary.QueuedBatchCount);
                }

                if (s.TcpSummary != null)
                {
                    Console.WriteLine("C = Msg/s: {0},  Last: {1},  Kilobytes/s: {2}, AvgTcpMS:{3} AvgTotalMS: {4} Async: {5}",
                        s.TcpSummary.MessagesPerSecond,
                        s.TcpSummary.MessagesLastBatch,
                        s.TcpSummary.KilobytesPerSecond,
                        s.TcpSummary.AverageWriteDuration.TotalMilliseconds,
                        s.TcpSummary.AverageTotalDuration.TotalMilliseconds,
                        stats.Gauges.ActiveWriteOperation);
                }
            });

            Console.WriteLine("Upload Rate: Msg/s: {0}  Kilobytes/s: {1}  Max Msg/s: {2}  Last Batch: {3}", 
                stats.NetworkWriteSummaries.Where(x => x.TcpSummary != null).Sum(x => x.TcpSummary.MessagesPerSecond),
                stats.NetworkWriteSummaries.Where(x => x.TcpSummary != null).Sum(x => x.TcpSummary.KilobytesPerSecond),
                stats.NetworkWriteSummaries.Where(x => x.TcpSummary != null).Sum(x => x.TcpSummary.MaxMessagesPerSecond),
                stats.NetworkWriteSummaries.Where(x => x.TcpSummary != null).Sum(x => x.TcpSummary.MessagesLastBatch));

            Console.WriteLine("");
        }