Skip to content

Commit

Permalink
Merge branch 'unstable' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
PokIsemaine authored Aug 3, 2024
2 parents 10042b6 + 0f5f18e commit 1f98cdb
Show file tree
Hide file tree
Showing 24 changed files with 1,559 additions and 60 deletions.
30 changes: 30 additions & 0 deletions .github/workflows/kvrocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,12 @@ jobs:
- name: ArchLinux
image: archlinux:base
compiler: gcc
- name: Rocky Linux 8
image: rockylinux:8
compiler: gcc
- name: Rocky Linux 9
image: rockylinux:9
compiler: gcc

runs-on: ubuntu-22.04
container:
Expand All @@ -448,6 +454,30 @@ jobs:
update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-11 100
echo "NPROC=$(nproc)" >> $GITHUB_ENV
- name: Setup Rocky Linux 8
if: ${{ startsWith(matrix.image, 'rockylinux:8') }}
run: |
dnf install -y epel-release
dnf config-manager --set-enabled powertools
dnf install -y git gcc-toolset-12 autoconf automake libtool libstdc++-static python3 python3-pip openssl-devel which cmake
source /opt/rh/gcc-toolset-12/enable
update-alternatives --install /usr/bin/g++ g++ /opt/rh/gcc-toolset-12/root/usr/bin/g++ 100
update-alternatives --install /usr/bin/cc cc /opt/rh/gcc-toolset-12/root/usr/bin/gcc 100
update-alternatives --install /usr/bin/c++ c++ /opt/rh/gcc-toolset-12/root/usr/bin/g++ 100
echo "NPROC=$(nproc)" >> $GITHUB_ENV
- name: Setup Rocky Linux 9
if: ${{ startsWith(matrix.image, 'rockylinux:9') }}
run: |
dnf install -y epel-release
dnf config-manager --set-enabled crb
dnf install -y git gcc-toolset-12 autoconf automake libtool libstdc++-static python3 python3-pip openssl-devel which cmake
source /opt/rh/gcc-toolset-12/enable
update-alternatives --install /usr/bin/g++ g++ /opt/rh/gcc-toolset-12/root/usr/bin/g++ 100
update-alternatives --install /usr/bin/cc cc /opt/rh/gcc-toolset-12/root/usr/bin/gcc 100
update-alternatives --install /usr/bin/c++ c++ /opt/rh/gcc-toolset-12/root/usr/bin/g++ 100
echo "NPROC=$(nproc)" >> $GITHUB_ENV
- name: Cache redis
id: cache-redis
uses: actions/cache@v3
Expand Down
14 changes: 7 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ endif()
find_package(Backtrace REQUIRED)

if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 7)
message(FATAL_ERROR "It is expected to build kvrocks with GCC 7 or above")
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 8)
message(FATAL_ERROR "It is expected to build kvrocks with GCC 8 or above")
endif()
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 5)
message(FATAL_ERROR "It is expected to build kvrocks with Clang 5 or above")
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 8)
message(FATAL_ERROR "It is expected to build kvrocks with Clang 8 or above")
endif()
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang")
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 10)
message(FATAL_ERROR "It is expected to build kvrocks with Xcode toolchains 10 or above")
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 11)
message(FATAL_ERROR "It is expected to build kvrocks with Xcode toolchains 11 or above")
endif()
else()
message(WARNING "The compiler you are currently using is not officially supported,
so you can try switching to GCC>=7 or Clang>=5 if you encounter problems")
so you can try switching to GCC>=8 or Clang>=8 if you encounter problems")
endif()

if(CMAKE_GENERATOR STREQUAL "Ninja")
Expand Down
103 changes: 103 additions & 0 deletions src/commands/cmd_hll.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include <types/redis_hyperloglog.h>

#include <algorithm>

#include "commander.h"
#include "commands/command_parser.h"
#include "server/redis_reply.h"
#include "server/server.h"

namespace redis {

/// PFADD key [element [element ...]]
/// Complexity: O(1) for each element added.
class CommandPfAdd final : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::HyperLogLog hll(srv->storage, conn->GetNamespace());
std::vector<uint64_t> hashes(args_.size() - 1);
for (size_t i = 1; i < args_.size(); i++) {
hashes[i - 1] = redis::HyperLogLog::HllHash(args_[i]);
}
uint64_t ret{};
auto s = hll.Add(args_[0], hashes, &ret);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = redis::Integer(ret);
return Status::OK();
}
};

/// PFCOUNT key [key ...]
/// Complexity: O(1) with a very small average constant time when called with a single key.
/// O(N) with N being the number of keys, and much bigger constant times,
/// when called with multiple keys.
class CommandPfCount final : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::HyperLogLog hll(srv->storage, conn->GetNamespace());
uint64_t ret{};
rocksdb::Status s;
if (args_.size() > 1) {
std::vector<Slice> keys(args_.begin(), args_.end());
s = hll.CountMultiple(keys, &ret);
} else {
s = hll.Count(args_[0], &ret);
}
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
if (s.IsNotFound()) {
ret = 0;
}
*output = redis::Integer(ret);
return Status::OK();
}
};

/// PFMERGE destkey [sourcekey [sourcekey ...]]
///
/// complexity: O(N) to merge N HyperLogLogs, but with high constant times.
class CommandPfMerge final : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::HyperLogLog hll(srv->storage, conn->GetNamespace());
std::vector<std::string> keys(args_.begin() + 1, args_.end());
std::vector<Slice> src_user_keys;
src_user_keys.reserve(args_.size() - 1);
for (size_t i = 1; i < args_.size(); i++) {
src_user_keys.emplace_back(args_[i]);
}
auto s = hll.Merge(/*dest_user_key=*/args_[0], src_user_keys);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = redis::SimpleString("OK");
return Status::OK();
}
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandPfAdd>("pfadd", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandPfCount>("pfcount", -2, "read-only", 1, -1, 1),
MakeCmdAttr<CommandPfMerge>("pfmerge", -2, "write", 1, -1, 1), );

} // namespace redis
20 changes: 20 additions & 0 deletions src/search/common_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,26 @@ struct TreeTransformer {

return result;
}

template <typename T = double>
static StatusOr<std::vector<T>> Binary2Vector(std::string_view str) {
if (str.size() % sizeof(T) != 0) {
return {Status::NotOK, "data size is not a multiple of the target type size"};
}

std::vector<T> values;
const size_t type_size = sizeof(T);
values.reserve(str.size() / type_size);

while (!str.empty()) {
T value;
memcpy(&value, str.data(), type_size);
values.push_back(value);
str.remove_prefix(type_size);
}

return values;
}
};

} // namespace kqir
71 changes: 70 additions & 1 deletion src/search/ir.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,63 @@ struct NumericCompareExpr : BoolAtomExpr {
}
};

struct VectorLiteral : Literal {
std::vector<double> values;

explicit VectorLiteral(std::vector<double> &&values) : values(std::move(values)){};

std::string_view Name() const override { return "VectorLiteral"; }
std::string Dump() const override {
return fmt::format("[{}]", util::StringJoin(values, [](auto v) { return std::to_string(v); }));
}
std::string Content() const override { return Dump(); }

std::unique_ptr<Node> Clone() const override { return std::make_unique<VectorLiteral>(*this); }
};

struct VectorRangeExpr : BoolAtomExpr {
std::unique_ptr<FieldRef> field;
std::unique_ptr<NumericLiteral> range;
std::unique_ptr<VectorLiteral> vector;

VectorRangeExpr(std::unique_ptr<FieldRef> &&field, std::unique_ptr<NumericLiteral> &&range,
std::unique_ptr<VectorLiteral> &&vector)
: field(std::move(field)), range(std::move(range)), vector(std::move(vector)) {}

std::string_view Name() const override { return "VectorRangeExpr"; }
std::string Dump() const override {
return fmt::format("{} <-> {} < {}", field->Dump(), vector->Dump(), range->Dump());
}

std::unique_ptr<Node> Clone() const override {
return std::make_unique<VectorRangeExpr>(Node::MustAs<FieldRef>(field->Clone()),
Node::MustAs<NumericLiteral>(range->Clone()),
Node::MustAs<VectorLiteral>(vector->Clone()));
}
};

struct VectorKnnExpr : BoolAtomExpr {
// TODO: Support pre-filter for hybrid query
std::unique_ptr<FieldRef> field;
std::unique_ptr<NumericLiteral> k;
std::unique_ptr<VectorLiteral> vector;

VectorKnnExpr(std::unique_ptr<FieldRef> &&field, std::unique_ptr<NumericLiteral> &&k,
std::unique_ptr<VectorLiteral> &&vector)
: field(std::move(field)), k(std::move(k)), vector(std::move(vector)) {}

std::string_view Name() const override { return "VectorKnnExpr"; }
std::string Dump() const override {
return fmt::format("KNN k={}, {} <-> {}", k->Dump(), field->Dump(), vector->Dump());
}

std::unique_ptr<Node> Clone() const override {
return std::make_unique<VectorKnnExpr>(Node::MustAs<FieldRef>(field->Clone()),
Node::MustAs<NumericLiteral>(k->Clone()),
Node::MustAs<VectorLiteral>(vector->Clone()));
}
};

struct BoolLiteral : BoolAtomExpr, Literal {
bool val;

Expand Down Expand Up @@ -336,18 +393,30 @@ struct LimitClause : Node {
std::string Content() const override { return fmt::format("{}, {}", offset, count); }

std::unique_ptr<Node> Clone() const override { return std::make_unique<LimitClause>(*this); }
size_t Offset() const { return offset; }

size_t Count() const { return count; }
};

struct SortByClause : Node {
enum Order { ASC, DESC } order = ASC;
std::unique_ptr<FieldRef> field;
std::unique_ptr<VectorLiteral> vector = nullptr;

SortByClause(Order order, std::unique_ptr<FieldRef> &&field) : order(order), field(std::move(field)) {}
SortByClause(std::unique_ptr<FieldRef> &&field, std::unique_ptr<VectorLiteral> &&vector)
: field(std::move(field)), vector(std::move(vector)) {}

static constexpr const char *OrderToString(Order order) { return order == ASC ? "asc" : "desc"; }
bool IsVectorField() const { return vector != nullptr; }

std::string_view Name() const override { return "SortByClause"; }
std::string Dump() const override { return fmt::format("sortby {}, {}", field->Dump(), OrderToString(order)); }
std::string Dump() const override {
if (!IsVectorField()) {
return fmt::format("sortby {}, {}", field->Dump(), OrderToString(order));
}
return fmt::format("sortby {} <-> {}", field->Dump(), vector->Dump());
}
std::string Content() const override { return OrderToString(order); }

NodeIterator ChildBegin() override { return NodeIterator(field.get()); };
Expand Down
63 changes: 63 additions & 0 deletions src/search/ir_sema_checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ struct SemaChecker {
GET_OR_RET(Check(v->query_expr.get()));
if (v->limit) GET_OR_RET(Check(v->limit.get()));
if (v->sort_by) GET_OR_RET(Check(v->sort_by.get()));
if (v->sort_by && v->sort_by->IsVectorField() && !v->limit) {
return {Status::NotOK, "expect a LIMIT clause for vector field to construct a KNN search"};
}
} else {
return {Status::NotOK, fmt::format("index `{}` not found", index_name)};
}
Expand All @@ -60,8 +63,25 @@ struct SemaChecker {
return {Status::NotOK, fmt::format("field `{}` not found in index `{}`", v->field->name, current_index->name)};
} else if (!iter->second.IsSortable()) {
return {Status::NotOK, fmt::format("field `{}` is not sortable", v->field->name)};
} else if (auto is_vector = iter->second.MetadataAs<redis::HnswVectorFieldMetadata>() != nullptr;
is_vector != v->IsVectorField()) {
std::string not_str = is_vector ? "" : "not ";
return {Status::NotOK,
fmt::format("field `{}` is {}a vector field according to metadata and does {}expect a vector parameter",
v->field->name, not_str, not_str)};
} else {
v->field->info = &iter->second;
if (v->IsVectorField()) {
auto meta = v->field->info->MetadataAs<redis::HnswVectorFieldMetadata>();
if (!v->field->info->HasIndex()) {
return {Status::NotOK,
fmt::format("field `{}` is marked as NOINDEX and cannot be used for KNN search", v->field->name)};
}
if (v->vector->values.size() != meta->dim) {
return {Status::NotOK,
fmt::format("vector should be of size `{}` for field `{}`", meta->dim, v->field->name)};
}
}
}
} else if (auto v = dynamic_cast<AndExpr *>(node)) {
for (const auto &n : v->inners) {
Expand Down Expand Up @@ -97,6 +117,49 @@ struct SemaChecker {
} else {
v->field->info = &iter->second;
}
} else if (auto v = dynamic_cast<VectorKnnExpr *>(node)) {
if (auto iter = current_index->fields.find(v->field->name); iter == current_index->fields.end()) {
return {Status::NotOK, fmt::format("field `{}` not found in index `{}`", v->field->name, current_index->name)};
} else if (!iter->second.MetadataAs<redis::HnswVectorFieldMetadata>()) {
return {Status::NotOK, fmt::format("field `{}` is not a vector field", v->field->name)};
} else {
v->field->info = &iter->second;

if (!v->field->info->HasIndex()) {
return {Status::NotOK,
fmt::format("field `{}` is marked as NOINDEX and cannot be used for KNN search", v->field->name)};
}
if (v->k->val <= 0) {
return {Status::NotOK, fmt::format("KNN search parameter `k` must be greater than 0")};
}
auto meta = v->field->info->MetadataAs<redis::HnswVectorFieldMetadata>();
if (v->vector->values.size() != meta->dim) {
return {Status::NotOK,
fmt::format("vector should be of size `{}` for field `{}`", meta->dim, v->field->name)};
}
}
} else if (auto v = dynamic_cast<VectorRangeExpr *>(node)) {
if (auto iter = current_index->fields.find(v->field->name); iter == current_index->fields.end()) {
return {Status::NotOK, fmt::format("field `{}` not found in index `{}`", v->field->name, current_index->name)};
} else if (!iter->second.MetadataAs<redis::HnswVectorFieldMetadata>()) {
return {Status::NotOK, fmt::format("field `{}` is not a vector field", v->field->name)};
} else {
v->field->info = &iter->second;

auto meta = v->field->info->MetadataAs<redis::HnswVectorFieldMetadata>();
if (meta->distance_metric == redis::DistanceMetric::L2 && v->range->val < 0) {
return {Status::NotOK, "range cannot be a negative number for l2 distance metric"};
}

if (meta->distance_metric == redis::DistanceMetric::COSINE && (v->range->val < 0 || v->range->val > 2)) {
return {Status::NotOK, "range has to be between 0 and 2 for cosine distance metric"};
}

if (v->vector->values.size() != meta->dim) {
return {Status::NotOK,
fmt::format("vector should be of size `{}` for field `{}`", meta->dim, v->field->name)};
}
}
} else if (auto v = dynamic_cast<SelectClause *>(node)) {
for (const auto &n : v->fields) {
if (auto iter = current_index->fields.find(n->name); iter == current_index->fields.end()) {
Expand Down
Loading

0 comments on commit 1f98cdb

Please sign in to comment.