From 51104869a19daa086b34881a7f7eb94e7e9ae5b2 Mon Sep 17 00:00:00 2001 From: lappely | Kirill Gnapovsky <82707867+poipoiPIO@users.noreply.github.com> Date: Sat, 14 Sep 2024 19:14:40 +0300 Subject: [PATCH 1/4] feat: add support of Bitmap type DUMP/RESTORE command support (#2535) --- src/storage/rdb.cc | 15 ++++++++++++++- tests/gocase/unit/dump/dump_test.go | 18 ++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/src/storage/rdb.cc b/src/storage/rdb.cc index 43bad3a022d..544e679ff64 100644 --- a/src/storage/rdb.cc +++ b/src/storage/rdb.cc @@ -29,7 +29,10 @@ #include "rdb_listpack.h" #include "rdb_ziplist.h" #include "rdb_zipmap.h" +#include "storage/redis_metadata.h" #include "time_util.h" +#include "types/redis_bitmap.h" +#include "types/redis_bitmap_string.h" #include "types/redis_hash.h" #include "types/redis_list.h" #include "types/redis_set.h" @@ -718,7 +721,7 @@ Status RDB::Dump(const std::string &key, const RedisType type) { Status RDB::SaveObjectType(const RedisType type) { int robj_type = -1; - if (type == kRedisString) { + if (type == kRedisString || type == kRedisBitmap) { robj_type = RDBTypeString; } else if (type == kRedisHash) { robj_type = RDBTypeHash; @@ -781,6 +784,16 @@ Status RDB::SaveObject(const std::string &key, const RedisType type) { } return SaveHashObject(field_values); + } else if (type == kRedisBitmap) { + std::string value; + redis::Bitmap bitmap_db(storage_, ns_); + Config *config = storage_->GetConfig(); + uint32_t max_btos_size = static_cast(config->max_bitmap_to_string_mb) * MiB; + auto s = bitmap_db.GetString(ctx, key, max_btos_size, &value); + if (!s.ok() && !s.IsNotFound()) { + return {Status::RedisExecErr, s.ToString()}; + } + return SaveStringObject(value); } else { LOG(WARNING) << "Invalid or Not supported object type: " << type; return {Status::NotOK, "Invalid or Not supported object type"}; diff --git a/tests/gocase/unit/dump/dump_test.go b/tests/gocase/unit/dump/dump_test.go index bca9300d1ea..154891c3f07 100644 --- a/tests/gocase/unit/dump/dump_test.go +++ b/tests/gocase/unit/dump/dump_test.go @@ -149,3 +149,21 @@ func TestDump_Set(t *testing.T) { require.NoError(t, rdb.RestoreReplace(ctx, restoredKey, 0, serialized).Err()) require.ElementsMatch(t, members, rdb.SMembers(ctx, restoredKey).Val()) } + +func TestDump_Bitset(t *testing.T) { + srv := util.StartServer(t, map[string]string{}) + defer srv.Close() + + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + + key := "bitsetKey1" + require.NoError(t, rdb.SetBit(ctx, key, 1, 1).Err()) + serialized, err := rdb.Dump(ctx, key).Result() + require.NoError(t, err) + + restoredKey := fmt.Sprintf("restore_%s", key) + require.NoError(t, rdb.RestoreReplace(ctx, restoredKey, 0, serialized).Err()) + require.Equal(t, rdb.Get(ctx, key).Val(), rdb.Get(ctx, restoredKey).Val()) +} From 28fc474b2aa63410dbf9da0c03a4f987fdb518b1 Mon Sep 17 00:00:00 2001 From: Aleks Lozovyuk Date: Sun, 15 Sep 2024 14:50:51 +0300 Subject: [PATCH 2/4] chore: bump cpptrace to v0.7.1 (#2536) Co-authored-by: Twice --- cmake/cpptrace.cmake | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmake/cpptrace.cmake b/cmake/cpptrace.cmake index fedb6fd33d0..0357f41ba2c 100644 --- a/cmake/cpptrace.cmake +++ b/cmake/cpptrace.cmake @@ -20,8 +20,8 @@ include_guard() include(cmake/utils.cmake) FetchContent_DeclareGitHubWithMirror(cpptrace - jeremy-rifkin/cpptrace v0.7.0 - MD5=d897c48f5bf96134109f7e6716f2fd31 + jeremy-rifkin/cpptrace v0.7.1 + MD5=8b62f5d3033ab59146cb1fd3ca89d859 ) if (SYMBOLIZE_BACKEND STREQUAL "libbacktrace") From 399961dd847faf0512b5cddb24bdce2977b07796 Mon Sep 17 00:00:00 2001 From: Aleks Lozovyuk Date: Sun, 15 Sep 2024 16:47:52 +0300 Subject: [PATCH 3/4] chore: bump PEGTL to v3.2.8 (#2537) Co-authored-by: Twice --- cmake/pegtl.cmake | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmake/pegtl.cmake b/cmake/pegtl.cmake index d1638f365c1..6e4b581f24b 100644 --- a/cmake/pegtl.cmake +++ b/cmake/pegtl.cmake @@ -20,8 +20,8 @@ include_guard() include(cmake/utils.cmake) FetchContent_DeclareGitHubTarWithMirror(pegtl - taocpp/PEGTL 3.2.7 - MD5=31b14660c883bc0489ddcdfbd29199c9 + taocpp/PEGTL 3.2.8 + MD5=50339029d1bb037909b28c382214033e ) FetchContent_MakeAvailableWithArgs(pegtl) From 89c5b2419bc66e35c5a2ff0c2d5d581033486ae4 Mon Sep 17 00:00:00 2001 From: Jonathan Chen <86070045+jonathanc-n@users.noreply.github.com> Date: Mon, 16 Sep 2024 20:36:54 -0400 Subject: [PATCH 4/4] chore(tests): add basic tests for the stream consumer group (#2533) --- tests/gocase/unit/type/stream/stream_test.go | 108 +++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 943127fcaaa..b50b230f6c0 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -1119,6 +1119,114 @@ func TestStreamOffset(t *testing.T) { require.Equal(t, msgID.ID, infoGroup.LastDeliveredID) }) + t.Run("XINFO Test idle time and pending messages, for issue #2478", func(t *testing.T) { + streamName := "test-stream-2478" + groupName := "test-group-2478" + consumerName := "test-consumer-2478" + + rdb.Del(ctx, streamName) + rdb.XGroupDestroy(ctx, streamName, groupName) + + for i := 1; i <= 5; i++ { + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: fmt.Sprintf("%d-0", i), + Values: map[string]interface{}{"field": fmt.Sprintf("value%d", i)}, + }).Err()) + } + + require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamName, ">"}, + Count: 5, + }).Result() + require.NoError(t, err) + require.Len(t, r[0].Messages, 5) + + time.Sleep(2 * time.Second) + + consumers, err := rdb.XInfoConsumers(ctx, streamName, groupName).Result() + require.NoError(t, err) + + var consumerInfo redis.XInfoConsumer + for _, c := range consumers { + if c.Name == consumerName { + consumerInfo = c + break + } + } + + require.True(t, consumerInfo.Idle >= 2000) + require.Equal(t, int64(5), consumerInfo.Pending) + + ackIDs := make([]string, 5) + for i := 1; i <= 5; i++ { + ackIDs[i-1] = fmt.Sprintf("%d-0", i) + } + require.NoError(t, rdb.XAck(ctx, streamName, groupName, ackIDs...).Err()) + + consumers, err = rdb.XInfoConsumers(ctx, streamName, groupName).Result() + require.NoError(t, err) + + for _, c := range consumers { + if c.Name == consumerName { + consumerInfo = c + break + } + } + + require.Equal(t, int64(0), consumerInfo.Pending) + }) + + t.Run("XINFO Test consumer removal and inactive time, for issue #2478", func(t *testing.T) { + streamName := "stream-test-2478" + groupName := "group-test-2478" + consumerName := "consumer-test-2478" + + rdb.Del(ctx, streamName) + rdb.XGroupDestroy(ctx, streamName, groupName) + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "*", + Values: map[string]interface{}{"field": "value"}, + }).Err()) + + require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamName, ">"}, + Count: 1, + }).Result() + require.NoError(t, err) + + time.Sleep(500 * time.Millisecond) + + consumers, err := rdb.XInfoConsumers(ctx, streamName, groupName).Result() + require.NoError(t, err) + + var consumerInfo redis.XInfoConsumer + for _, c := range consumers { + if c.Name == consumerName { + consumerInfo = c + break + } + } + + require.Equal(t, consumerName, consumerInfo.Name) + require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Err()) + + consumers, err = rdb.XInfoConsumers(ctx, streamName, groupName).Result() + require.NoError(t, err) + + for _, c := range consumers { + require.NotEqual(t, consumerName, c.Name) + } + }) + t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue #2109", func(t *testing.T) { streamName := "test-stream" group := "group"