-
Notifications
You must be signed in to change notification settings - Fork 435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement publisher subscriber library using redis streams [NIT-2319] #2196
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initial
pubsub/consumer.go
Outdated
var DefaultConsumerConfig = &ConsumerConfig{ | ||
ResponseEntryTimeout: time.Hour, | ||
KeepAliveTimeout: 5 * time.Minute, | ||
RedisStream: "default", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for both producer and consumer:
A single redis server should have multiple streams. To avoid confusion/misconfiguration - default should be empty, and producer/consumer should fail to init with empty string (like redisURL)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Did the same for RedisGroups as well, I assume same applies there.
pubsub/producer.go
Outdated
if err != nil { | ||
return false | ||
} | ||
return time.Now().UnixMilli()-val < int64(p.cfg.KeepAliveTimeout.Milliseconds()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redis timeout will delete the value and that is good enough for us.
In the race condition that we detect consumer death only on the next loop - it's o.k. to be a little late
on the other hand, we don't want to mistakingly consider a server dead because there's some offset between it's local time and our local time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
pubsub/producer.go
Outdated
} | ||
|
||
// Unmarshal converts a JSON byte slice back to the generic type object. | ||
func (j jsonMarshaller[T]) Unmarshal(val []byte) (T, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you don't need Marshaller at all. You can just call json.Marshal/Unmarshal directly from producer/consumer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
approved with comments |
No description provided.