diff --git a/cmd/kaf/consume.go b/cmd/kaf/consume.go index 866543ea..2fb27c79 100644 --- a/cmd/kaf/consume.go +++ b/cmd/kaf/consume.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "os" "sync" "github.com/birdayz/sarama" @@ -11,13 +12,15 @@ import ( var ( offsetFlag string - prettyPrint bool + raw bool + follow bool ) func init() { rootCmd.AddCommand(consumeCmd) consumeCmd.Flags().StringVar(&offsetFlag, "offset", "oldest", "Offset to start consuming. Possible values: oldest, newest. Default: newest") - consumeCmd.Flags().BoolVar(&prettyPrint, "pretty", false, "Pretty print output if possible, e.g. for JSON.") + consumeCmd.Flags().BoolVar(&raw, "raw", false, "Print raw output of messages, without key or prettified JSON") + consumeCmd.Flags().BoolVarP(&follow, "follow", "f", false, "Shorthand to start consuming with offset HEAD-1 on each partition. Overrides --offset flag") } var consumeCmd = &cobra.Command{ @@ -61,6 +64,25 @@ var consumeCmd = &cobra.Command{ wg.Add(1) go func(partition int32) { + req := &sarama.OffsetRequest{ + Version: int16(1), + } + req.AddBlock(topic, partition, int64(-1), int32(0)) + ldr, err := client.Leader(topic, partition) + if err != nil { + panic(err) + } + offsets, err := ldr.GetAvailableOffsets(req) + if err != nil { + panic(err) + } + followOffset := offsets.GetBlock(topic, partition).Offset - 1 + + if follow && followOffset > 0 { + offset = followOffset + fmt.Fprintf(os.Stderr, "Starting on partition %v with offet %v\n", partition, offset) + } + pc, err := consumer.ConsumePartition(topic, partition, offset) if err != nil { panic(err) @@ -70,14 +92,14 @@ var consumeCmd = &cobra.Command{ dataToDisplay := msg.Value - if prettyPrint { + if !raw { formatted, err := prettyjson.Format(msg.Value) if err == nil { dataToDisplay = formatted } } - if msg.Key != nil && len(msg.Key) > 0 { + if msg.Key != nil && len(msg.Key) > 0 && !raw { fmt.Printf("%v %v\n", string(msg.Key), string(dataToDisplay)) } else { fmt.Printf("%v\n", string(dataToDisplay))