Skip to content

Commit

Permalink
add --follow / -f flag to start consuming from HEAD-1 offset
Browse files Browse the repository at this point in the history
  • Loading branch information
birdayz committed Nov 21, 2018
1 parent b66ee6c commit e94dd35
Showing 1 changed file with 26 additions and 4 deletions.
30 changes: 26 additions & 4 deletions cmd/kaf/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"os"
"sync"

"github.com/birdayz/sarama"
Expand All @@ -11,13 +12,15 @@ import (

var (
offsetFlag string
prettyPrint bool
raw bool
follow bool
)

func init() {
rootCmd.AddCommand(consumeCmd)
consumeCmd.Flags().StringVar(&offsetFlag, "offset", "oldest", "Offset to start consuming. Possible values: oldest, newest. Default: newest")
consumeCmd.Flags().BoolVar(&prettyPrint, "pretty", false, "Pretty print output if possible, e.g. for JSON.")
consumeCmd.Flags().BoolVar(&raw, "raw", false, "Print raw output of messages, without key or prettified JSON")
consumeCmd.Flags().BoolVarP(&follow, "follow", "f", false, "Shorthand to start consuming with offset HEAD-1 on each partition. Overrides --offset flag")
}

var consumeCmd = &cobra.Command{
Expand Down Expand Up @@ -61,6 +64,25 @@ var consumeCmd = &cobra.Command{
wg.Add(1)

go func(partition int32) {
req := &sarama.OffsetRequest{
Version: int16(1),
}
req.AddBlock(topic, partition, int64(-1), int32(0))
ldr, err := client.Leader(topic, partition)
if err != nil {
panic(err)
}
offsets, err := ldr.GetAvailableOffsets(req)
if err != nil {
panic(err)
}
followOffset := offsets.GetBlock(topic, partition).Offset - 1

if follow && followOffset > 0 {
offset = followOffset
fmt.Fprintf(os.Stderr, "Starting on partition %v with offet %v\n", partition, offset)
}

pc, err := consumer.ConsumePartition(topic, partition, offset)
if err != nil {
panic(err)
Expand All @@ -70,14 +92,14 @@ var consumeCmd = &cobra.Command{

dataToDisplay := msg.Value

if prettyPrint {
if !raw {
formatted, err := prettyjson.Format(msg.Value)
if err == nil {
dataToDisplay = formatted
}
}

if msg.Key != nil && len(msg.Key) > 0 {
if msg.Key != nil && len(msg.Key) > 0 && !raw {
fmt.Printf("%v %v\n", string(msg.Key), string(dataToDisplay))
} else {
fmt.Printf("%v\n", string(dataToDisplay))
Expand Down

0 comments on commit e94dd35

Please sign in to comment.