Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement publisher subscriber library using redis streams [NIT-2319] #2196

Merged
merged 48 commits into from
Apr 16, 2024

Conversation

anodar
Copy link
Contributor

@anodar anodar commented Mar 18, 2024

No description provided.

@cla-bot cla-bot bot added the s Automatically added by the CLA bot if the creator of a PR is registered as having signed the CLA. label Mar 18, 2024
@anodar anodar changed the title Implement publisher subscriber library using redis streams Implement publisher subscriber library using redis streams [NIT-2319] Mar 18, 2024
@anodar anodar marked this pull request as ready for review March 18, 2024 22:32
Copy link
Contributor

@tsahee tsahee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initial

pubsub/consumer.go Outdated Show resolved Hide resolved
pubsub/consumer.go Outdated Show resolved Hide resolved
pubsub/consumer.go Outdated Show resolved Hide resolved
pubsub/consumer.go Outdated Show resolved Hide resolved
pubsub/consumer.go Outdated Show resolved Hide resolved
pubsub/consumer.go Outdated Show resolved Hide resolved
pubsub/consumer.go Outdated Show resolved Hide resolved
pubsub/consumer.go Outdated Show resolved Hide resolved
pubsub/consumer.go Outdated Show resolved Hide resolved
@anodar anodar requested a review from tsahee March 26, 2024 20:07
pubsub/consumer.go Outdated Show resolved Hide resolved
pubsub/consumer.go Outdated Show resolved Hide resolved
pubsub/consumer.go Show resolved Hide resolved
pubsub/producer.go Outdated Show resolved Hide resolved
pubsub/producer.go Outdated Show resolved Hide resolved
pubsub/producer.go Outdated Show resolved Hide resolved
pubsub/producer.go Outdated Show resolved Hide resolved
@anodar anodar requested a review from tsahee March 27, 2024 17:30
@anodar anodar requested a review from tsahee April 4, 2024 16:02
pubsub/producer.go Outdated Show resolved Hide resolved
pubsub/producer.go Outdated Show resolved Hide resolved
pubsub/producer.go Show resolved Hide resolved
pubsub/producer.go Show resolved Hide resolved
pubsub/pubsub_test.go Outdated Show resolved Hide resolved
pubsub/pubsub_test.go Outdated Show resolved Hide resolved
pubsub/pubsub_test.go Outdated Show resolved Hide resolved
pubsub/pubsub_test.go Show resolved Hide resolved
pubsub/pubsub_test.go Outdated Show resolved Hide resolved
pubsub/pubsub_test.go Outdated Show resolved Hide resolved
pubsub/consumer.go Outdated Show resolved Hide resolved
pubsub/producer.go Outdated Show resolved Hide resolved
@anodar anodar requested a review from tsahee April 12, 2024 09:25
tsahee
tsahee previously approved these changes Apr 12, 2024
var DefaultConsumerConfig = &ConsumerConfig{
ResponseEntryTimeout: time.Hour,
KeepAliveTimeout: 5 * time.Minute,
RedisStream: "default",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for both producer and consumer:
A single redis server should have multiple streams. To avoid confusion/misconfiguration - default should be empty, and producer/consumer should fail to init with empty string (like redisURL)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Did the same for RedisGroups as well, I assume same applies there.

if err != nil {
return false
}
return time.Now().UnixMilli()-val < int64(p.cfg.KeepAliveTimeout.Milliseconds())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redis timeout will delete the value and that is good enough for us.
In the race condition that we detect consumer death only on the next loop - it's o.k. to be a little late
on the other hand, we don't want to mistakingly consider a server dead because there's some offset between it's local time and our local time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

// Unmarshal converts a JSON byte slice back to the generic type object.
func (j jsonMarshaller[T]) Unmarshal(val []byte) (T, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need Marshaller at all. You can just call json.Marshal/Unmarshal directly from producer/consumer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@tsahee
Copy link
Contributor

tsahee commented Apr 12, 2024

approved with comments

@anodar anodar requested a review from tsahee April 15, 2024 08:13
@anodar anodar enabled auto-merge April 15, 2024 08:13
@anodar anodar merged commit 1dd4fb3 into master Apr 16, 2024
8 checks passed
@anodar anodar deleted the redis-stream branch April 16, 2024 15:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
s Automatically added by the CLA bot if the creator of a PR is registered as having signed the CLA.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants