Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Sequence number should be outside of message converter (#327)
Browse files Browse the repository at this point in the history
* Sequence number should be outside of message converter

* set all of these members (c.Message, c.Message.Message, etc) inline .. as part of initialization in line 131 itself

* update kafka url
  • Loading branch information
thuningxu authored Nov 20, 2017
1 parent 65a1bb1 commit cb4e93e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ before_install:
- pip install --user ccm
- pip install --user cqlsh==5.0.3
- sudo apt-get install gcc g++ -y
- wget http://www.us.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz -O kafka.tgz
- wget https://archive.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz -O kafka.tgz
- mkdir -p kafka && tar xzf kafka.tgz -C kafka --strip-components 1
- nohup bash -c "cd kafka && bin/zookeeper-server-start.sh config/zookeeper.properties &"
- nohup bash -c "cd kafka && bin/kafka-server-start.sh config/server.properties &"
Expand Down
69 changes: 36 additions & 33 deletions services/outputhost/kafkaStream.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,20 @@ func (k *kafkaStream) Read() (*store.ReadMessageContent, error) {
return nil, errKafkaClosed
}
k.creditSemaphore.Acquire(1) // TODO: Size-based credits
return k.kafkaConverter(m), nil
c := k.kafkaConverter(m)
c.Message.Message.SequenceNumber = common.Int64Ptr(k.getNextSequenceNumber())
return c, nil
}

// ResponseHeaders returns the response headers sent from the server. This will block until server headers have been received.
func (k *kafkaStream) ResponseHeaders() (map[string]string, error) {
return nil, errors.New(`unimplemented`)
}

func (k *kafkaStream) getNextSequenceNumber() int64 {
return atomic.AddInt64(&k.seqNo, 1)
}

/*
* Setup & Utility
*/
Expand All @@ -114,55 +120,52 @@ func OpenKafkaStream(c <-chan *s.ConsumerMessage, kafkaMessageConverter KafkaMes
kafkaConverter: kafkaMessageConverter,
}
if k.kafkaConverter == nil {
k.kafkaConverter = GetDefaultKafkaMessageConverter(&k.seqNo, k.logger)
k.kafkaConverter = GetDefaultKafkaMessageConverter(k.logger)
}
return k
}

// GetDefaultKafkaMessageConverter returns the default kafka message converter
func GetDefaultKafkaMessageConverter(seqNo *int64, logger bark.Logger) KafkaMessageConverter {
func GetDefaultKafkaMessageConverter(logger bark.Logger) KafkaMessageConverter {
return func(m *s.ConsumerMessage) (c *store.ReadMessageContent) {
c = &store.ReadMessageContent{
Type: store.ReadMessageContentTypePtr(store.ReadMessageContentType_MESSAGE),
}

c.Message = &store.ReadMessage{
Address: common.Int64Ptr(
int64(kafkaAddresser.GetStoreAddress(
&TopicPartition{
Topic: m.Topic,
Partition: m.Partition,
},
m.Offset,
func() bark.Logger {
return logger.WithFields(bark.Fields{
`module`: `kafkaStream`,
Message: &store.ReadMessage{
Address: common.Int64Ptr(
int64(kafkaAddresser.GetStoreAddress(
&TopicPartition{
Topic: m.Topic,
Partition: m.Partition,
},
m.Offset,
func() bark.Logger {
return logger.WithFields(bark.Fields{
`module`: `kafkaStream`,
`topic`: m.Topic,
`partition`: m.Partition,
})
},
))),
Message: &store.AppendMessage{
Payload: &cherami.PutMessage{
Data: m.Value,
UserContext: map[string]string{
`key`: string(m.Key),
`topic`: m.Topic,
`partition`: m.Partition,
})
`partition`: strconv.Itoa(int(m.Partition)),
`offset`: strconv.Itoa(int(m.Offset)),
},
// TODO: Checksum?
},
))),
}

c.Message.Message = &store.AppendMessage{
SequenceNumber: common.Int64Ptr(atomic.AddInt64(seqNo, 1)),
},
},
}

if !m.Timestamp.IsZero() {
// only set if kafka is version 0.10+
c.Message.Message.EnqueueTimeUtc = common.Int64Ptr(m.Timestamp.UnixNano())
}

c.Message.Message.Payload = &cherami.PutMessage{
Data: m.Value,
UserContext: map[string]string{
`key`: string(m.Key),
`topic`: m.Topic,
`partition`: strconv.Itoa(int(m.Partition)),
`offset`: strconv.Itoa(int(m.Offset)),
},
// TODO: Checksum?
}
return c
}
}

0 comments on commit cb4e93e

Please sign in to comment.