Skip to content

Commit

Permalink
Merge branch 'apache:unstable' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
PokIsemaine committed Sep 17, 2024
2 parents adc497a + 89c5b24 commit fcfeec5
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 5 deletions.
4 changes: 2 additions & 2 deletions cmake/cpptrace.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ include_guard()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(cpptrace
jeremy-rifkin/cpptrace v0.7.0
MD5=d897c48f5bf96134109f7e6716f2fd31
jeremy-rifkin/cpptrace v0.7.1
MD5=8b62f5d3033ab59146cb1fd3ca89d859
)

if (SYMBOLIZE_BACKEND STREQUAL "libbacktrace")
Expand Down
4 changes: 2 additions & 2 deletions cmake/pegtl.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ include_guard()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubTarWithMirror(pegtl
taocpp/PEGTL 3.2.7
MD5=31b14660c883bc0489ddcdfbd29199c9
taocpp/PEGTL 3.2.8
MD5=50339029d1bb037909b28c382214033e
)

FetchContent_MakeAvailableWithArgs(pegtl)
15 changes: 14 additions & 1 deletion src/storage/rdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
#include "rdb_listpack.h"
#include "rdb_ziplist.h"
#include "rdb_zipmap.h"
#include "storage/redis_metadata.h"
#include "time_util.h"
#include "types/redis_bitmap.h"
#include "types/redis_bitmap_string.h"
#include "types/redis_hash.h"
#include "types/redis_list.h"
#include "types/redis_set.h"
Expand Down Expand Up @@ -718,7 +721,7 @@ Status RDB::Dump(const std::string &key, const RedisType type) {

Status RDB::SaveObjectType(const RedisType type) {
int robj_type = -1;
if (type == kRedisString) {
if (type == kRedisString || type == kRedisBitmap) {
robj_type = RDBTypeString;
} else if (type == kRedisHash) {
robj_type = RDBTypeHash;
Expand Down Expand Up @@ -781,6 +784,16 @@ Status RDB::SaveObject(const std::string &key, const RedisType type) {
}

return SaveHashObject(field_values);
} else if (type == kRedisBitmap) {
std::string value;
redis::Bitmap bitmap_db(storage_, ns_);
Config *config = storage_->GetConfig();
uint32_t max_btos_size = static_cast<uint32_t>(config->max_bitmap_to_string_mb) * MiB;
auto s = bitmap_db.GetString(ctx, key, max_btos_size, &value);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
return SaveStringObject(value);
} else {
LOG(WARNING) << "Invalid or Not supported object type: " << type;
return {Status::NotOK, "Invalid or Not supported object type"};
Expand Down
18 changes: 18 additions & 0 deletions tests/gocase/unit/dump/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,21 @@ func TestDump_Set(t *testing.T) {
require.NoError(t, rdb.RestoreReplace(ctx, restoredKey, 0, serialized).Err())
require.ElementsMatch(t, members, rdb.SMembers(ctx, restoredKey).Val())
}

func TestDump_Bitset(t *testing.T) {
srv := util.StartServer(t, map[string]string{})
defer srv.Close()

ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

key := "bitsetKey1"
require.NoError(t, rdb.SetBit(ctx, key, 1, 1).Err())
serialized, err := rdb.Dump(ctx, key).Result()
require.NoError(t, err)

restoredKey := fmt.Sprintf("restore_%s", key)
require.NoError(t, rdb.RestoreReplace(ctx, restoredKey, 0, serialized).Err())
require.Equal(t, rdb.Get(ctx, key).Val(), rdb.Get(ctx, restoredKey).Val())
}
108 changes: 108 additions & 0 deletions tests/gocase/unit/type/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,114 @@ func TestStreamOffset(t *testing.T) {
require.Equal(t, msgID.ID, infoGroup.LastDeliveredID)
})

t.Run("XINFO Test idle time and pending messages, for issue #2478", func(t *testing.T) {
streamName := "test-stream-2478"
groupName := "test-group-2478"
consumerName := "test-consumer-2478"

rdb.Del(ctx, streamName)
rdb.XGroupDestroy(ctx, streamName, groupName)

for i := 1; i <= 5; i++ {
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: fmt.Sprintf("%d-0", i),
Values: map[string]interface{}{"field": fmt.Sprintf("value%d", i)},
}).Err())
}

require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err())
r, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, ">"},
Count: 5,
}).Result()
require.NoError(t, err)
require.Len(t, r[0].Messages, 5)

time.Sleep(2 * time.Second)

consumers, err := rdb.XInfoConsumers(ctx, streamName, groupName).Result()
require.NoError(t, err)

var consumerInfo redis.XInfoConsumer
for _, c := range consumers {
if c.Name == consumerName {
consumerInfo = c
break
}
}

require.True(t, consumerInfo.Idle >= 2000)
require.Equal(t, int64(5), consumerInfo.Pending)

ackIDs := make([]string, 5)
for i := 1; i <= 5; i++ {
ackIDs[i-1] = fmt.Sprintf("%d-0", i)
}
require.NoError(t, rdb.XAck(ctx, streamName, groupName, ackIDs...).Err())

consumers, err = rdb.XInfoConsumers(ctx, streamName, groupName).Result()
require.NoError(t, err)

for _, c := range consumers {
if c.Name == consumerName {
consumerInfo = c
break
}
}

require.Equal(t, int64(0), consumerInfo.Pending)
})

t.Run("XINFO Test consumer removal and inactive time, for issue #2478", func(t *testing.T) {
streamName := "stream-test-2478"
groupName := "group-test-2478"
consumerName := "consumer-test-2478"

rdb.Del(ctx, streamName)
rdb.XGroupDestroy(ctx, streamName, groupName)

require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "*",
Values: map[string]interface{}{"field": "value"},
}).Err())

require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err())
_, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, ">"},
Count: 1,
}).Result()
require.NoError(t, err)

time.Sleep(500 * time.Millisecond)

consumers, err := rdb.XInfoConsumers(ctx, streamName, groupName).Result()
require.NoError(t, err)

var consumerInfo redis.XInfoConsumer
for _, c := range consumers {
if c.Name == consumerName {
consumerInfo = c
break
}
}

require.Equal(t, consumerName, consumerInfo.Name)
require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Err())

consumers, err = rdb.XInfoConsumers(ctx, streamName, groupName).Result()
require.NoError(t, err)

for _, c := range consumers {
require.NotEqual(t, consumerName, c.Name)
}
})

t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue #2109", func(t *testing.T) {
streamName := "test-stream"
group := "group"
Expand Down

0 comments on commit fcfeec5

Please sign in to comment.