Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support cuckoo filter and most commands #2545

Closed
wants to merge 14 commits into from
447 changes: 447 additions & 0 deletions src/commands/cmd_cuckoo.cc

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ enum class CommandCategory : uint8_t {
Bit,
BloomFilter,
Cluster,
CuckooFilter,
Function,
Geo,
Hash,
Expand Down
71 changes: 70 additions & 1 deletion src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ bool Metadata::ExpireAt(uint64_t expired_ts) const {
bool Metadata::IsSingleKVType() const { return Type() == kRedisString || Type() == kRedisJson; }

bool Metadata::IsEmptyableType() const {
return IsSingleKVType() || Type() == kRedisStream || Type() == kRedisBloomFilter || Type() == kRedisHyperLogLog;
return IsSingleKVType() || Type() == kRedisStream || Type() == kRedisBloomFilter || Type() == kRedisHyperLogLog ||
Type() == kRedisCuckooFilter;
}

bool Metadata::Expired() const { return ExpireAt(util::GetTimeStampMS()); }
Expand Down Expand Up @@ -495,3 +496,71 @@ rocksdb::Status HyperLogLogMetadata::Decode(Slice *input) {

return rocksdb::Status::OK();
}

void CuckooFilterMetadata::Encode(std::string *dst) const {
Metadata::Encode(dst);
PutFixed64(dst, capacity);
PutFixed16(dst, bucket_size);
PutFixed16(dst, max_iterations);
PutFixed32(dst, num_buckets);
PutFixed16(dst, expansion);
PutFixed16(dst, num_filters);
PutFixed64(dst, num_items);
PutFixed64(dst, num_deletes);
for (const auto &filter : filters) {
PutFixed16(dst, filter.bucket_size);
PutFixed64(dst, filter.num_buckets);
dst->append(filter.data.begin(), filter.data.end());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt the efficiency of this approach: everytime we read/update the filters, we need to retrieve/put all of these filters.

Please refer to the design of bloom filters to see if you can put filters to multiple key values.

}
}

rocksdb::Status CuckooFilterMetadata::Decode(Slice *input) {
if (auto s = Metadata::Decode(input); !s.ok()) {
return s;
}
if (!GetFixed64(input, &capacity)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
if (!GetFixed16(input, &bucket_size)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
if (!GetFixed16(input, &max_iterations)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
if (!GetFixed32(input, &num_buckets)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
if (!GetFixed16(input, &expansion)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
if (!GetFixed16(input, &num_filters)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
if (!GetFixed64(input, &num_items)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
if (!GetFixed64(input, &num_deletes)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}

filters.resize(num_filters);

for (size_t i = 0; i < num_filters; ++i) {
if (!GetFixed16(input, &filters[i].bucket_size)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
if (!GetFixed64(input, &filters[i].num_buckets)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}

size_t data_size = filters[i].bucket_size * filters[i].num_buckets;
filters[i].data.resize(data_size);

if (input->size() < data_size) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
memcpy(filters[i].data.data(), input->data(), data_size);
input->remove_prefix(data_size);
}
return rocksdb::Status::OK();
}
26 changes: 23 additions & 3 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <vector>

#include "encoding.h"
#include "types/cuckoo.h"
#include "types/redis_stream_base.h"

constexpr bool USE_64BIT_COMMON_FIELD_DEFAULT = METADATA_ENCODING_VERSION != 0;
Expand All @@ -50,6 +51,7 @@ enum RedisType : uint8_t {
kRedisBloomFilter = 9,
kRedisJson = 10,
kRedisHyperLogLog = 11,
kRedisCuckooFilter = 12,
};

struct RedisTypes {
Expand Down Expand Up @@ -91,9 +93,9 @@ enum RedisCommand {
kRedisCmdLMove,
};

const std::vector<std::string> RedisTypeNames = {"none", "string", "hash", "list",
"set", "zset", "bitmap", "sortedint",
"stream", "MBbloom--", "ReJSON-RL", "hyperloglog"};
const std::vector<std::string> RedisTypeNames = {"none", "string", "hash", "list", "set",
"zset", "bitmap", "sortedint", "stream", "MBbloom--",
"ReJSON-RL", "hyperloglog", "cfilter"};

constexpr const char *kErrMsgWrongType = "WRONGTYPE Operation against a key holding the wrong kind of value";
constexpr const char *kErrMsgKeyExpired = "the key was expired";
Expand Down Expand Up @@ -335,3 +337,21 @@ class HyperLogLogMetadata : public Metadata {

EncodeType encode_type = EncodeType::DENSE;
};

class CuckooFilterMetadata : public Metadata {
public:
uint64_t capacity;
uint16_t bucket_size;
uint16_t max_iterations;
uint32_t num_buckets;
uint16_t expansion;
uint16_t num_filters;
uint64_t num_items;
uint64_t num_deletes;
std::vector<SubCF> filters;

explicit CuckooFilterMetadata(bool generate_version = true) : Metadata(kRedisCuckooFilter, generate_version) {}

void Encode(std::string *dst) const override;
rocksdb::Status Decode(Slice *input) override;
};
Loading