Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement publisher subscriber library using redis streams [NIT-2319] #2196

Merged
merged 48 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
4676a7c
Implement publisher subscriber library using redis streams
anodar Mar 18, 2024
d071fe1
Merge branch 'master' into redis-stream
anodar Mar 18, 2024
7abd266
Fix lint
anodar Mar 19, 2024
169092e
Merge branch 'redis-stream' of github.com:OffchainLabs/nitro into red…
anodar Mar 19, 2024
651f26a
Fix tests
anodar Mar 19, 2024
3ae8a72
Address comments
anodar Mar 21, 2024
b28f3ac
Implement config structs for producer/consumer
anodar Mar 22, 2024
675c1c2
Drop commented out code, fix test
anodar Mar 25, 2024
0f43f60
Use stopwaiter instead of go primitives
anodar Mar 26, 2024
860d632
Merge branch 'master' into redis-stream
anodar Mar 26, 2024
046fb25
Fix linter error
anodar Mar 26, 2024
2bd73e7
Merge branch 'redis-stream' of github.com:OffchainLabs/nitro into red…
anodar Mar 26, 2024
a21e46a
Drop logging in tests
anodar Mar 26, 2024
07e4efe
Address comments
anodar Mar 27, 2024
79411f9
Drop commented out code
anodar Mar 27, 2024
862289c
Use redisutil package for creating redis client
anodar Mar 27, 2024
84e33f2
Merge branch 'master' into redis-stream
anodar Mar 27, 2024
2600022
Implement returning responses as container.Promise
anodar Mar 27, 2024
7c163bd
Merge branch 'redis-stream' of github.com:OffchainLabs/nitro into red…
anodar Mar 27, 2024
eb6e63b
Add generics to the producer/consumer
anodar Mar 27, 2024
f94c454
Simplify tests
anodar Mar 27, 2024
99b9939
Fix linter error
anodar Mar 27, 2024
b183881
Drop remnant test
anodar Mar 27, 2024
2a67624
Address comments
anodar Apr 1, 2024
dcc0e88
Merge branch 'master' into redis-stream
anodar Apr 1, 2024
0455d93
Address comments
anodar Apr 2, 2024
ccece5d
Merge branch 'master' into redis-stream
anodar Apr 2, 2024
378906e
Change Info to Trace
anodar Apr 2, 2024
33fae88
Ignore messages not produced by this producer
anodar Apr 2, 2024
247a985
Merge branch 'redis-stream' of github.com:OffchainLabs/nitro into red…
anodar Apr 2, 2024
5b5f709
Address data race
anodar Apr 2, 2024
0bd347e
Implement option to error out on failed requests instead of requeuein…
anodar Apr 4, 2024
c8101c2
Change generics to be any instead of Marshallable, introduce generic …
anodar Apr 4, 2024
972b030
Drop glogger in tests
anodar Apr 4, 2024
1edbd68
Drop remnant code
anodar Apr 4, 2024
0db255f
Make tests parallel
anodar Apr 4, 2024
9d450af
Fix data race
anodar Apr 4, 2024
d0e28b7
Merge branch 'master' into redis-stream
anodar Apr 5, 2024
8da1e86
Cleanup tests
anodar Apr 5, 2024
5c52884
Merge branch 'redis-stream' of github.com:OffchainLabs/nitro into red…
anodar Apr 5, 2024
590ec7b
Address comments
anodar Apr 9, 2024
49ca219
Merge branch 'master' into redis-stream
anodar Apr 9, 2024
6b24516
Drop generic marshaller, implement jsonMarshaller instead
anodar Apr 12, 2024
0ebca82
Merge branch 'master' into redis-stream
anodar Apr 12, 2024
92a7e3d
drop generic marshaller
anodar Apr 15, 2024
ef0f40a
Merge branch 'master' into redis-stream
anodar Apr 15, 2024
0180a2b
don't set redis group/stream name in test config either
anodar Apr 15, 2024
bdc0940
Merge branch 'redis-stream' of github.com:OffchainLabs/nitro into red…
anodar Apr 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ replace github.com/ethereum/go-ethereum => ./go-ethereum
require (
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible
github.com/Shopify/toxiproxy v2.1.4+incompatible
github.com/alicebob/miniredis/v2 v2.21.0
github.com/alicebob/miniredis/v2 v2.32.1
github.com/andybalholm/brotli v1.0.4
github.com/aws/aws-sdk-go-v2 v1.16.4
github.com/aws/aws-sdk-go-v2/config v1.15.5
Expand Down Expand Up @@ -262,7 +262,7 @@ require (
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.7.0 // indirect
Expand Down Expand Up @@ -319,7 +319,7 @@ require (
github.com/go-redis/redis/v8 v8.11.4
github.com/go-stack/stack v1.8.1 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/google/uuid v1.3.1
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/go-bexpr v0.1.10 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.21.0 h1:CdmwIlKUWFBDS+4464GtQiQ0R1vpzOgu4Vnd74rBL7M=
github.com/alicebob/miniredis/v2 v2.21.0/go.mod h1:XNqvJdQJv5mSuVMc0ynneafpnL/zv52acZ6kqeS0t88=
github.com/alicebob/miniredis/v2 v2.32.1 h1:Bz7CciDnYSaa0mX5xODh6GUITRSx+cVhjNoOR4JssBo=
github.com/alicebob/miniredis/v2 v2.32.1/go.mod h1:AqkLNAfUm0K07J28hnAyyQKf/x0YkCY/g5DCtuL01Mw=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
Expand Down Expand Up @@ -1688,8 +1688,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
Expand Down
177 changes: 177 additions & 0 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package pubsub

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/offchainlabs/nitro/util/redisutil"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/spf13/pflag"
)

type ConsumerConfig struct {
// Timeout of result entry in Redis.
ResponseEntryTimeout time.Duration `koanf:"response-entry-timeout"`
// Duration after which consumer is considered to be dead if heartbeat
// is not updated.
KeepAliveTimeout time.Duration `koanf:"keepalive-timeout"`
tsahee marked this conversation as resolved.
Show resolved Hide resolved
tsahee marked this conversation as resolved.
Show resolved Hide resolved
// Redis url for Redis streams and locks.
RedisURL string `koanf:"redis-url"`
// Redis stream name.
RedisStream string `koanf:"redis-stream"`
// Redis consumer group name.
RedisGroup string `koanf:"redis-group"`
}

var DefaultConsumerConfig = &ConsumerConfig{
ResponseEntryTimeout: time.Hour,
KeepAliveTimeout: 5 * time.Minute,
RedisStream: "",
RedisGroup: "",
}

var DefaultTestConsumerConfig = &ConsumerConfig{
RedisStream: "",
RedisGroup: "",
ResponseEntryTimeout: time.Minute,
KeepAliveTimeout: 30 * time.Millisecond,
}

func ConsumerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".response-entry-timeout", DefaultConsumerConfig.ResponseEntryTimeout, "timeout for response entry")
f.Duration(prefix+".keepalive-timeout", DefaultConsumerConfig.KeepAliveTimeout, "timeout after which consumer is considered inactive if heartbeat wasn't performed")
f.String(prefix+".redis-url", DefaultConsumerConfig.RedisURL, "redis url for redis stream")
f.String(prefix+".redis-stream", DefaultConsumerConfig.RedisStream, "redis stream name to read from")
f.String(prefix+".redis-group", DefaultConsumerConfig.RedisGroup, "redis stream consumer group name")
}
tsahee marked this conversation as resolved.
Show resolved Hide resolved

// Consumer implements a consumer for redis stream provides heartbeat to
// indicate it is alive.
type Consumer[Request any, Response any] struct {
stopwaiter.StopWaiter
id string
client redis.UniversalClient
cfg *ConsumerConfig
}

type Message[Request any] struct {
ID string
Value Request
}

func NewConsumer[Request any, Response any](ctx context.Context, cfg *ConsumerConfig) (*Consumer[Request, Response], error) {
if cfg.RedisURL == "" {
return nil, fmt.Errorf("redis url cannot be empty")
}
if cfg.RedisStream == "" {
return nil, fmt.Errorf("redis stream name cannot be empty")
}
if cfg.RedisGroup == "" {
return nil, fmt.Errorf("redis group name cannot be emtpy")
}
c, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
return nil, err
}
consumer := &Consumer[Request, Response]{
id: uuid.NewString(),
client: c,
cfg: cfg,
}
return consumer, nil
}

// Start starts the consumer to iteratively perform heartbeat in configured intervals.
func (c *Consumer[Request, Response]) Start(ctx context.Context) {
c.StopWaiter.Start(ctx, c)
c.StopWaiter.CallIteratively(
func(ctx context.Context) time.Duration {
c.heartBeat(ctx)
return c.cfg.KeepAliveTimeout / 10
},
)
}

func (c *Consumer[Request, Response]) StopAndWait() {
c.StopWaiter.StopAndWait()
}

func heartBeatKey(id string) string {
return fmt.Sprintf("consumer:%s:heartbeat", id)
}

func (c *Consumer[Request, Response]) heartBeatKey() string {
return heartBeatKey(c.id)
}

// heartBeat updates the heartBeat key indicating aliveness.
func (c *Consumer[Request, Response]) heartBeat(ctx context.Context) {
if err := c.client.Set(ctx, c.heartBeatKey(), time.Now().UnixMilli(), 2*c.cfg.KeepAliveTimeout).Err(); err != nil {
l := log.Info
if ctx.Err() != nil {
l = log.Error
}
l("Updating heardbeat", "consumer", c.id, "error", err)
}
}

// Consumer first checks it there exists pending message that is claimed by
// unresponsive consumer, if not then reads from the stream.
func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Request], error) {
res, err := c.client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: c.cfg.RedisGroup,
Consumer: c.id,
// Receive only messages that were never delivered to any other consumer,
// that is, only new messages.
Streams: []string{c.cfg.RedisStream, ">"},
Count: 1,
Block: time.Millisecond, // 0 seems to block the read instead of immediately returning
}).Result()
if errors.Is(err, redis.Nil) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("reading message for consumer: %q: %w", c.id, err)
}
if len(res) != 1 || len(res[0].Messages) != 1 {
return nil, fmt.Errorf("redis returned entries: %+v, for querying single message", res)
}
log.Debug(fmt.Sprintf("Consumer: %s consuming message: %s", c.id, res[0].Messages[0].ID))
var (
value = res[0].Messages[0].Values[messageKey]
data, ok = (value).(string)
)
if !ok {
return nil, fmt.Errorf("casting request to string: %w", err)
}
var req Request
if err := json.Unmarshal([]byte(data), &req); err != nil {
return nil, fmt.Errorf("unmarshaling value: %v, error: %w", value, err)
}

return &Message[Request]{
ID: res[0].Messages[0].ID,
Value: req,
}, nil
}

func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID string, result Response) error {
resp, err := json.Marshal(result)
if err != nil {
return fmt.Errorf("marshaling result: %w", err)
}
acquired, err := c.client.SetNX(ctx, messageID, resp, c.cfg.ResponseEntryTimeout).Result()
if err != nil || !acquired {
return fmt.Errorf("setting result for message: %v, error: %w", messageID, err)
}
if _, err := c.client.XAck(ctx, c.cfg.RedisStream, c.cfg.RedisGroup, messageID).Result(); err != nil {
return fmt.Errorf("acking message: %v, error: %w", messageID, err)
}
return nil
}
Loading
Loading