From e441e5a72faa7743dc377aba5469cb3dccd4cb3a Mon Sep 17 00:00:00 2001 From: ik Date: Sun, 11 Aug 2024 22:44:25 +0800 Subject: [PATCH] feat: support write method Signed-off-by: ik --- include/CMakeLists.txt | 1 + include/opengemini/Client.hpp | 83 +++++ include/opengemini/Point.hpp | 50 +++ include/opengemini/impl/Client.ipp | 24 ++ include/opengemini/impl/ClientImpl.hpp | 6 + include/opengemini/impl/ClientImpl.tpp | 31 ++ include/opengemini/impl/cli/Functor.hpp | 10 + include/opengemini/impl/cli/Write.tpp | 48 +++ .../impl/comm/CompletionSignature.hpp | 2 + include/opengemini/impl/comm/UrlTargets.hpp | 1 + .../impl/enc/LineProtocolEncoder.cpp | 189 ++++++++++ .../impl/enc/LineProtocolEncoder.hpp | 88 +++++ test/unit/CMakeLists.txt | 1 + test/unit/Client_Test.cpp | 55 +++ .../impl/enc/LineProtocolEncoder_Test.cpp | 346 ++++++++++++++++++ 15 files changed, 935 insertions(+) create mode 100644 include/opengemini/Point.hpp create mode 100644 include/opengemini/impl/cli/Write.tpp create mode 100644 include/opengemini/impl/enc/LineProtocolEncoder.cpp create mode 100644 include/opengemini/impl/enc/LineProtocolEncoder.hpp create mode 100644 test/unit/impl/enc/LineProtocolEncoder_Test.cpp diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index 5ad72e4..3b1aedc 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -46,6 +46,7 @@ else() opengemini/impl/ClientConfigBuilder.cpp opengemini/impl/ErrorCode.cpp opengemini/impl/comm/Context.cpp + opengemini/impl/enc/LineProtocolEncoder.cpp opengemini/impl/http/IHttpClient.cpp opengemini/impl/http/HttpClient.cpp opengemini/impl/http/HttpsClient.cpp diff --git a/include/opengemini/Client.hpp b/include/opengemini/Client.hpp index ecda77d..67adcea 100644 --- a/include/opengemini/Client.hpp +++ b/include/opengemini/Client.hpp @@ -21,6 +21,7 @@ #include "opengemini/ClientConfig.hpp" #include "opengemini/CompletionToken.hpp" +#include "opengemini/Point.hpp" #include "opengemini/Query.hpp" #include "opengemini/RetentionPolicy.hpp" @@ -357,6 +358,88 @@ class Client { std::string_view retentionPolicy, COMPLETION_TOKEN&& token = {}); + /// + /// \~English + /// @brief Write a point. + /// @param database Name of the database. + /// @param point Single point as @ref Point . + /// @param retentionPolicy Name of the retention policy, default to empty + /// string (no retention policy is specified). + /// @param token The completion token which will be invoked when the task + /// complete. Default to @ref token::sync if this parameter is not + /// specified. If passing function object as token, the function signature + /// must be: + /// @code + /// void ( + /// // Result of operation, it means success if the value is nullptr, + /// // otherwise, contains an exception. + /// std::exception_ptr error + /// ) + /// @endcode + /// + /// \~Chinese + /// @brief 写入一个点位。 + /// @param database 数据库名称。 + /// @param point 单个点位@ref Point 。 + /// @param retentionPolicy + /// 保留策略名称,默认值为空字符串(即不指定保留策略)。 + /// @param token 任务完成令牌,将在任务完成后被调用。 + /// 若没有指定该参数,则使用默认值:@ref token::sync 。 + /// 若传递函数对象作为完成令牌,则其签名必须满足: + /// @code + /// void ( + /// // 执行结果,仅当值为nullptr时代表成功,否则将承载相关的异常。 + /// std::exception_ptr error + /// ) + /// @endcode + /// + template + [[nodiscard]] auto Write(std::string_view database, + Point point, + std::string_view retentionPolicy = {}, + COMPLETION_TOKEN&& token = {}); + + /// + /// \~English + /// @brief Write multiple points. + /// @param database Name of the database. + /// @param points A vector of points. + /// @param retentionPolicy Name of the retention policy, default to empty + /// string (no retention policy is specified). + /// @param token The completion token which will be invoked when the task + /// complete. Default to @ref token::sync if this parameter is not + /// specified. If passing function object as token, the function signature + /// must be: + /// @code + /// void ( + /// // Result of operation, it means success if the value is nullptr, + /// // otherwise, contains an exception. + /// std::exception_ptr error + /// ) + /// @endcode + /// + /// \~Chinese + /// @brief 写入多个点位。 + /// @param database 数据库名称。 + /// @param points 点位数组。 + /// @param retentionPolicy + /// 保留策略名称,默认值为空字符串(即不指定保留策略)。 + /// @param token 任务完成令牌,将在任务完成后被调用。 + /// 若没有指定该参数,则使用默认值:@ref token::sync 。 + /// 若传递函数对象作为完成令牌,则其签名必须满足: + /// @code + /// void ( + /// // 执行结果,仅当值为nullptr时代表成功,否则将承载相关的异常。 + /// std::exception_ptr error + /// ) + /// @endcode + /// + template + [[nodiscard]] auto Write(std::string_view database, + std::vector points, + std::string_view retentionPolicy = {}, + COMPLETION_TOKEN&& token = {}); + private: Client(const Client&) = delete; Client& operator=(const Client&) = delete; diff --git a/include/opengemini/Point.hpp b/include/opengemini/Point.hpp new file mode 100644 index 0000000..1b67ace --- /dev/null +++ b/include/opengemini/Point.hpp @@ -0,0 +1,50 @@ +// +// Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#ifndef OPENGEMINI_POINT_HPP +#define OPENGEMINI_POINT_HPP + +#include +#include +#include +#include + +#include "opengemini/Precision.hpp" + +namespace opengemini { + +/// +/// \~English +/// @brief Holds the point data. +/// +/// \~Chinese +/// @brief 点位数据。 +/// +struct Point { + using Field = std::variant; + using Time = std::chrono::time_point; + + std::string measurement; + std::map fields; + Time time; + std::map tags; + Precision precision{ Precision::Nanosecond }; +}; + +} // namespace opengemini + +#endif // !OPENGEMINI_POINT_HPP diff --git a/include/opengemini/impl/Client.ipp b/include/opengemini/impl/Client.ipp index 9c24746..07409b0 100644 --- a/include/opengemini/impl/Client.ipp +++ b/include/opengemini/impl/Client.ipp @@ -101,4 +101,28 @@ auto Client::DropRetentionPolicy(std::string_view database, std::forward(token)); } +template +auto Client::Write(std::string_view database, + Point point, + std::string_view retentionPolicy, + COMPLETION_TOKEN&& token) +{ + return impl_->Write(database, + std::move(point), + retentionPolicy, + std::forward(token)); +} + +template +auto Client::Write(std::string_view database, + std::vector points, + std::string_view retentionPolicy, + COMPLETION_TOKEN&& token) +{ + return impl_->Write(database, + std::move(points), + retentionPolicy, + std::forward(token)); +} + } // namespace opengemini diff --git a/include/opengemini/impl/ClientImpl.hpp b/include/opengemini/impl/ClientImpl.hpp index ac71c71..244f1be 100644 --- a/include/opengemini/impl/ClientImpl.hpp +++ b/include/opengemini/impl/ClientImpl.hpp @@ -67,6 +67,12 @@ class ClientImpl { std::string_view retentionPolicy, COMPLETION_TOKEN&& token); + template + auto Write(std::string_view database, + POINT_TYPE point, + std::string_view retentionPolicy, + COMPLETION_TOKEN&& token); + private: std::shared_ptr ConstructHttpClient(const ClientConfig& config); diff --git a/include/opengemini/impl/ClientImpl.tpp b/include/opengemini/impl/ClientImpl.tpp index 78cc89d..18e9da9 100644 --- a/include/opengemini/impl/ClientImpl.tpp +++ b/include/opengemini/impl/ClientImpl.tpp @@ -194,6 +194,35 @@ auto ClientImpl::DropRetentionPolicy(std::string_view database, std::string(retentionPolicy)); } +template +auto ClientImpl::Write(std::string_view database, + POINT_TYPE point, + std::string_view retentionPolicy, + COMPLETION_TOKEN&& token) +{ + using Signature = sig::Write; + return boost::asio::async_initiate( + [this](auto&& token, + std::string database, + std::string retentionPolicy, + POINT_TYPE point) { + static_assert(util::IsInvocable_v, + "Completion signature of Write must be: " + "void(std::exception_ptr)"); + + Spawn( + Functor::RunWrite{ this, + std::move(database), + std::move(retentionPolicy), + std::move(point) }, + OPENGEMINI_PF(token)); + }, + token, + std::string(database), + std::string(retentionPolicy), + std::move(point)); +} + template + struct RunWrite { + ClientImpl* impl_; + std::string db_; + std::string rp_; + POINT_TYPE point_; + + void operator()(boost::asio::yield_context yield) const; + }; + struct RunQueryGet { ClientImpl* impl_; struct Query query_; diff --git a/include/opengemini/impl/cli/Write.tpp b/include/opengemini/impl/cli/Write.tpp new file mode 100644 index 0000000..8a66260 --- /dev/null +++ b/include/opengemini/impl/cli/Write.tpp @@ -0,0 +1,48 @@ +// +// Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include "opengemini/impl/cli/Functor.hpp" + +#include + +#include "opengemini/Exception.hpp" +#include "opengemini/impl/comm/UrlTargets.hpp" +#include "opengemini/impl/enc/LineProtocolEncoder.hpp" + +namespace opengemini::impl { + +template +void ClientImpl::Functor::RunWrite::operator()( + boost::asio::yield_context yield) const +{ + auto content = enc::LineProtocolEncoder{}.Encode(point_); + + boost::url target(url::WRITE); + target.set_query(fmt::format("db={}&rp={}", db_, rp_)); + + auto rsp = impl_->http_->Post(impl_->lb_->PickAvailableServer(), + target.buffer(), + std::move(content), + yield); + if (rsp.result() != http::Status::no_content) { + throw Exception(errc::ServerErrors::UnexpectedStatusCode, + fmt::format("Received code: {}, body:{}", + rsp.result_int(), + rsp.body())); + } +} + +} // namespace opengemini::impl diff --git a/include/opengemini/impl/comm/CompletionSignature.hpp b/include/opengemini/impl/comm/CompletionSignature.hpp index 9733f62..19a1231 100644 --- a/include/opengemini/impl/comm/CompletionSignature.hpp +++ b/include/opengemini/impl/comm/CompletionSignature.hpp @@ -38,6 +38,8 @@ using ShowRetentionPolicies = void(std::exception_ptr, std::vector); using DropRetentionPolicy = void(std::exception_ptr); +using Write = void(std::exception_ptr); + } // namespace opengemini::impl::sig #endif // !OPENGEMINI_IMPL_COMM_COMPLETIONSIGNATURE_HPP diff --git a/include/opengemini/impl/comm/UrlTargets.hpp b/include/opengemini/impl/comm/UrlTargets.hpp index f395f07..ac35d21 100644 --- a/include/opengemini/impl/comm/UrlTargets.hpp +++ b/include/opengemini/impl/comm/UrlTargets.hpp @@ -21,6 +21,7 @@ namespace opengemini::impl::url { inline constexpr auto PING = "/ping"; inline constexpr auto QUERY = "/query"; +inline constexpr auto WRITE = "/write"; } // namespace opengemini::impl::url diff --git a/include/opengemini/impl/enc/LineProtocolEncoder.cpp b/include/opengemini/impl/enc/LineProtocolEncoder.cpp new file mode 100644 index 0000000..99636d4 --- /dev/null +++ b/include/opengemini/impl/enc/LineProtocolEncoder.cpp @@ -0,0 +1,189 @@ +// +// Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include "opengemini/impl/enc/LineProtocolEncoder.hpp" + +#include +#include + +#include "opengemini/Exception.hpp" +#include "opengemini/Precision.hpp" + +namespace opengemini::impl::enc { + +namespace { + +template +inline auto Round(const Point::Time& time) +{ + return std::chrono::time_point_cast( + std::chrono::round(time)) + .time_since_epoch() + .count(); +} + +} // namespace + +OPENGEMINI_INLINE_SPECIFIER +LineProtocolEncoder::LineProtocolEncoder() +{ + os_.precision(12); +} + +OPENGEMINI_INLINE_SPECIFIER +std::string LineProtocolEncoder::Encode(const Point& point) +{ + AppendPoint(point); + return os_.str(); +} + +OPENGEMINI_INLINE_SPECIFIER +std::string LineProtocolEncoder::Encode(const std::vector& points) +{ + if (points.empty()) { + throw Exception(errc::LogicErrors::InvalidArgument, + "The points array should not be empty"); + } + + for (auto& point : points) { + AppendPoint(point); + Append(ELEMENT_LF); + } + + return os_.str(); +} + +OPENGEMINI_INLINE_SPECIFIER +void LineProtocolEncoder::AppendPoint(const Point& point) +{ + auto& [measurement, fields, time, tags, precision] = point; + AppendMeasurement(measurement); + AppendTags(tags); + AppendFields(fields); + AppendTimestamp(time, precision); +} + +OPENGEMINI_INLINE_SPECIFIER +void LineProtocolEncoder::AppendMeasurement(std::string_view measurement) +{ + if (measurement.empty()) { + throw Exception(errc::LogicErrors::InvalidArgument, + "The filed in Point must not be empty"); + } + + AppendEscapeString(measurement, ESCAPE_CHARS_MEASUREMENT); +} + +OPENGEMINI_INLINE_SPECIFIER +void LineProtocolEncoder::AppendTags(const decltype(Point::tags)& tags) +{ + for (auto& [key, value] : tags) { + Append(ELEMENT_COMMA); + AppendEscapeString(key, ESCAPE_CHARS_TAG); + Append(ELEMENT_EQUAL); + AppendEscapeString(value, ESCAPE_CHARS_TAG); + } +} + +OPENGEMINI_INLINE_SPECIFIER +void LineProtocolEncoder::AppendFields(const decltype(Point::fields)& fields) +{ + if (fields.empty()) { + throw Exception(errc::LogicErrors::InvalidArgument, + "The filed in Point must not be empty"); + } + + Append(ELEMENT_SPACE); + + assert(!fields.empty()); + std::for_each_n(fields.begin(), + fields.size() - 1, + [this](const auto& field) { + AppendField(field); + Append(ELEMENT_COMMA); + }); + + AppendField(*fields.rbegin()); +} + +OPENGEMINI_INLINE_SPECIFIER +void LineProtocolEncoder::AppendTimestamp(const Point::Time& time, + Precision precision) +{ + using namespace std::chrono; + + int64_t count{ 0 }; + switch (precision) { + case Precision::Nanosecond: count = time.time_since_epoch().count(); break; + case Precision::Microsecond: count = Round(time); break; + case Precision::Millisecond: count = Round(time); break; + case Precision::Second: count = Round(time); break; + case Precision::Minute: count = Round(time); break; + case Precision::Hour: count = Round(time); break; + } + + if (count != 0) { + Append(ELEMENT_SPACE); + Append(count); + } +} + +OPENGEMINI_INLINE_SPECIFIER +void LineProtocolEncoder::AppendField( + const decltype(Point::fields)::value_type& field) +{ + auto& [key, value] = field; + + AppendEscapeString(key, ESCAPE_CHARS_FIELD_KEY); + Append(ELEMENT_EQUAL); + std::visit( + [this](const auto& innerValue) { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + Append(ELEMENT_DQUOTE); + AppendEscapeString(innerValue, ESCAPE_CHARS_FIELD_VALUE); + Append(ELEMENT_DQUOTE); + } + else if constexpr (std::is_same_v) { + Append(innerValue); + Append(ELEMENT_UINT); + } + else if constexpr (std::is_same_v) { + Append(innerValue); + Append(ELEMENT_INT); + } + else if constexpr (std::is_same_v) { + Append(innerValue ? ELEMENT_TRUE : ELEMENT_FALSE); + } + else if constexpr (std::is_same_v) { + Append(innerValue); + } + }, + value); +} + +OPENGEMINI_INLINE_SPECIFIER +void LineProtocolEncoder::AppendEscapeString(std::string_view origin, + std::string_view escapes) +{ + for (auto ch : origin) { + if (escapes.find(ch) != escapes.npos) { Append(ELEMENT_BSLASH); } + Append(ch); + } +} + +} // namespace opengemini::impl::enc diff --git a/include/opengemini/impl/enc/LineProtocolEncoder.hpp b/include/opengemini/impl/enc/LineProtocolEncoder.hpp new file mode 100644 index 0000000..7828214 --- /dev/null +++ b/include/opengemini/impl/enc/LineProtocolEncoder.hpp @@ -0,0 +1,88 @@ +// +// Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#ifndef OPENGEMINI_IMPL_ENC_LINEPROTOCOLENCODER_HPP +#define OPENGEMINI_IMPL_ENC_LINEPROTOCOLENCODER_HPP + +#include +#include +#include + +#include "opengemini/Point.hpp" +#include "opengemini/impl/util/Preprocessor.hpp" + +namespace opengemini::impl::enc { + +class LineProtocolEncoder { +public: + LineProtocolEncoder(); + + std::string Encode(const Point& point); + std::string Encode(const std::vector& points); + +private: + void AppendPoint(const Point& point); + + void AppendMeasurement(std::string_view measurement); + void AppendTags(const decltype(Point::tags)& tags); + void AppendFields(const decltype(Point::fields)& fields); + void AppendTimestamp(const Point::Time& time, Precision precision); + + void AppendField(const decltype(Point::fields)::value_type& field); + void AppendEscapeString(std::string_view origin, std::string_view escapes); + + template + void Append(T&& t) + { + os_ << std::forward(t); + } + +private: + std::ostringstream os_; + + static constexpr auto ELEMENT_LF{ '\n' }; + static constexpr auto ELEMENT_COMMA{ ',' }; + static constexpr auto ELEMENT_EQUAL{ '=' }; + static constexpr auto ELEMENT_SPACE{ ' ' }; + static constexpr auto ELEMENT_DQUOTE{ '"' }; + static constexpr auto ELEMENT_BSLASH{ '\\' }; + + static constexpr auto ELEMENT_UINT{ 'u' }; + static constexpr auto ELEMENT_INT{ 'i' }; + static constexpr auto ELEMENT_TRUE{ 'T' }; + static constexpr auto ELEMENT_FALSE{ 'F' }; + + static constexpr char ESCAPE_CHARS_TAG[]{ ELEMENT_COMMA, + ELEMENT_EQUAL, + ELEMENT_SPACE }; + + static constexpr char ESCAPE_CHARS_FIELD_KEY[]{ ELEMENT_COMMA, + ELEMENT_EQUAL, + ELEMENT_SPACE }; + static constexpr char ESCAPE_CHARS_FIELD_VALUE[]{ ELEMENT_DQUOTE, + ELEMENT_BSLASH }; + + static constexpr char ESCAPE_CHARS_MEASUREMENT[]{ ELEMENT_COMMA, + ELEMENT_SPACE }; +}; + +} // namespace opengemini::impl::enc + +#ifndef OPENGEMINI_SEPARATE_COMPILATION +# include "opengemini/impl/enc/LineProtocolEncoder.cpp" +#endif // !OPENGEMINI_SEPARATE_COMPILATION +// +#endif // !OPENGEMINI_IMPL_ENC_LINEPROTOCOLENCODER_HPP diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index b31dce6..e5420ae 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -23,6 +23,7 @@ add_executable(UnitTest impl/cli/Ping_Test.cpp impl/cli/Query_Test.cpp impl/cli/RetentionPolicy_Test.cpp + impl/enc/LineProtocolEncoder_Test.cpp impl/http/IHttpClient_Test.cpp impl/lb/LoadBalancer_Test.cpp ) diff --git a/test/unit/Client_Test.cpp b/test/unit/Client_Test.cpp index 00d0d14..1d0447c 100644 --- a/test/unit/Client_Test.cpp +++ b/test/unit/Client_Test.cpp @@ -14,6 +14,7 @@ // limitations under the License. // +#include #include #include @@ -120,4 +121,58 @@ TEST(ClientTest, RetentionPolicy) client.DropDatabase(db); } +TEST(ClientTest, Write) +{ + Client client{ + ClientConfigBuilder().AppendAddress({ "127.0.0.1", 8086 }).Finalize() + }; + + std::cout << client.Query( + { "ExampleDatabase", "select * from ExampleMeasurement" }) + << std::endl; + + Point point1{ "ExampleMeasurement", + { { "key1", "val11111111111111111111111111111" }, + { "key2", 2222 }, + { "key3", 0.333 }, + { "key4", true }, + { "key5", -5555 } }, + std::chrono::system_clock::now(), + {}, + Precision::Nanosecond }; + + Point point2{ "ExampleMeasurement", + { { "key1", "val1" }, + { "key2", 2222 }, + { "key3", 0.333 }, + { "key4", true }, + { "key5", -5555 } }, + std::chrono::system_clock::now(), + {}, + Precision::Hour }; + + Point point3{ "ExampleMeasurement", + { { "key1", "val1" }, + { "key2", 2222 }, + { "key3", 0.333 }, + { "key4", true }, + { "key5", -5555 } }, + std::chrono::system_clock::now(), + {}, + Precision::Second }; + + Point point4{ point3 }; + + client.Write("ExampleDatabase", point1); + client.Write("ExampleDatabase", point2); + client.Write("ExampleDatabase", { point1, point2, point3 }); + client.Write("ExampleDatabase", std::move(point4)); + + std::this_thread::sleep_for(500ms); + + std::cout << client.Query( + { "ExampleDatabase", "select * from ExampleMeasurement" }) + << std::endl; +} + } // namespace opengemini::test diff --git a/test/unit/impl/enc/LineProtocolEncoder_Test.cpp b/test/unit/impl/enc/LineProtocolEncoder_Test.cpp new file mode 100644 index 0000000..493b170 --- /dev/null +++ b/test/unit/impl/enc/LineProtocolEncoder_Test.cpp @@ -0,0 +1,346 @@ +// +// Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include "opengemini/impl/enc/LineProtocolEncoder.hpp" +#include "test/ExpectThrowAs.hpp" + +using namespace std::chrono_literals; + +namespace opengemini::test { + +using namespace opengemini::impl; + +TEST(LineProtocolEncoderTest, WithoutEscapedChars) +{ + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a", 1 } }, Point::Time{ 1ns }, { { "T0", "0" } } }), + "test,T0=0 a=1i 1"); +} + +TEST(LineProtocolEncoderTest, WithEscapedCharsInMeasurement) +{ + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test,", { { "a", 1 } }, Point::Time{ 1ns }, { { "T0", "0" } } }), + R"(test\,,T0=0 a=1i 1)"); + + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test ", { { "a", 1 } }, Point::Time{ 1ns }, { { "T0", "0" } } }), + R"(test\ ,T0=0 a=1i 1)"); + + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode({ "test, test", + { { "a", 1 } }, + Point::Time{ 1ns }, + { { "T0", "0" } } }), + R"(test\,\ test,T0=0 a=1i 1)"); +} + +TEST(LineProtocolEncoderTest, WithEscapedCharsInTagKey) +{ + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a", 1 } }, Point::Time{ 1ns }, { { "T0,", "0" } } }), + R"(test,T0\,=0 a=1i 1)"); + + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a", 1 } }, Point::Time{ 1ns }, { { "T0=", "0" } } }), + R"(test,T0\==0 a=1i 1)"); + + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a", 1 } }, Point::Time{ 1ns }, { { "T0 ", "0" } } }), + R"(test,T0\ =0 a=1i 1)"); + + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode({ "test", + { { "a", 1 } }, + Point::Time{ 1ns }, + { { "T,=T T", "0" } } }), + R"(test,T\,\=T\ T=0 a=1i 1)"); +} + +TEST(LineProtocolEncoderTest, WithEscapedCharsInTagValue) +{ + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a", 1 } }, Point::Time{ 1ns }, { { "T0", "0," } } }), + R"(test,T0=0\, a=1i 1)"); + + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a", 1 } }, Point::Time{ 1ns }, { { "T0", "0=" } } }), + R"(test,T0=0\= a=1i 1)"); + + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a", 1 } }, Point::Time{ 1ns }, { { "T0", "0 " } } }), + R"(test,T0=0\ a=1i 1)"); + + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode({ "test", + { { "a", 1 } }, + Point::Time{ 1ns }, + { { "T0", "0=0,0 0" } } }), + R"(test,T0=0\=0\,0\ 0 a=1i 1)"); +} + +TEST(LineProtocolEncoderTest, WithEscapedCharsInFieldKey) +{ + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a,", 1 } }, Point::Time{ 1ns }, { { "T0", "0" } } }), + R"(test,T0=0 a\,=1i 1)"); + + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a=", 1 } }, Point::Time{ 1ns }, { { "T0", "0" } } }), + R"(test,T0=0 a\==1i 1)"); + + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a ", 1 } }, Point::Time{ 1ns }, { { "T0", "0" } } }), + R"(test,T0=0 a\ =1i 1)"); + + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode({ "test", + { { "a,= a a ", 1 } }, + Point::Time{ 1ns }, + { { "T0", "0" } } }), + R"(test,T0=0 a\,\=\ a\ a\ =1i 1)"); +} + +TEST(LineProtocolEncoderTest, WithEscapedCharsInFieldValue) +{ + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode({ "test", + { { "a", R"(1")" } }, + Point::Time{ 1ns }, + { { "T0", "0" } } }), + R"(test,T0=0 a="1\"" 1)"); + + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode({ "test", + { { "a", R"(1\)" } }, + Point::Time{ 1ns }, + { { "T0", "0" } } }), + R"(test,T0=0 a="1\\" 1)"); + + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode({ "test", + { { "a", R"(1\\)" } }, + Point::Time{ 1ns }, + { { "T0", "0" } } }), + R"(test,T0=0 a="1\\\\" 1)"); + + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode({ "test", + { { "a", R"(1\")" } }, + Point::Time{ 1ns }, + { { "T0", "0" } } }), + R"(test,T0=0 a="1\\\"" 1)"); +} + +TEST(LineProtocolEncoderTest, WithDifferentFieldTypes) +{ + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode({ "test", + { { "a", 1.0 } }, + Point::Time{ 1ns }, + { { "T0", "0" } } }), + R"(test,T0=0 a=1 1)"); + + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode({ "test", + { { "a", 1.2 } }, + Point::Time{ 1ns }, + { { "T0", "0" } } }), + R"(test,T0=0 a=1.2 1)"); + + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode({ "test", + { { "a", 12.345678901234 } }, + Point::Time{ 1ns }, + { { "T0", "0" } } }), + R"(test,T0=0 a=12.3456789012 1)"); + + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode({ "test", + { { "a", 12345 } }, + Point::Time{ 1ns }, + { { "T0", "0" } } }), + R"(test,T0=0 a=12345i 1)"); + + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode({ "test", + { { "a", 12345678901234567890ul } }, + Point::Time{ 1ns }, + { { "T0", "0" } } }), + R"(test,T0=0 a=12345678901234567890u 1)"); + + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode({ "test", + { { "a", true } }, + Point::Time{ 1ns }, + { { "T0", "0" } } }), + R"(test,T0=0 a=T 1)"); + + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode({ "test", + { { "a", false } }, + Point::Time{ 1ns }, + { { "T0", "0" } } }), + R"(test,T0=0 a=F 1)"); +} + +TEST(LineProtocolEncoderTest, WithDifferentTimeUnderDefaultPrecision) +{ + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a", 1 } }, Point::Time{ 1ns }, { { "T0", "0" } } }), + "test,T0=0 a=1i 1"); + + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a", 1 } }, Point::Time{ 1us }, { { "T0", "0" } } }), + "test,T0=0 a=1i 1000"); + + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a", 1 } }, Point::Time{ 1ms }, { { "T0", "0" } } }), + "test,T0=0 a=1i 1000000"); + + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a", 1 } }, Point::Time{ 1s }, { { "T0", "0" } } }), + "test,T0=0 a=1i 1000000000"); + + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a", 1 } }, Point::Time{ 1min }, { { "T0", "0" } } }), + "test,T0=0 a=1i 60000000000"); + + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "test", { { "a", 1 } }, Point::Time{ 1h }, { { "T0", "0" } } }), + "test,T0=0 a=1i 3600000000000"); +} + +TEST(LineProtocolEncoderTest, WithDifferentPrecision) +{ + Point point{ "test", + { { "a", 1 } }, + Point::Time{ 1h + 2min + 3s + 4ms + 5us + 6ns }, + { { "T0", "0" } } }; + + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode(point), + "test,T0=0 a=1i 3723004005006"); + + point.precision = Precision::Hour; + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode(point), + "test,T0=0 a=1i 3600000000000"); + + point.precision = Precision::Minute; + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode(point), + "test,T0=0 a=1i 3720000000000"); + + point.precision = Precision::Second; + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode(point), + "test,T0=0 a=1i 3723000000000"); + + point.precision = Precision::Millisecond; + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode(point), + "test,T0=0 a=1i 3723004000000"); + + point.precision = Precision::Microsecond; + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode(point), + "test,T0=0 a=1i 3723004005000"); +} + +TEST(LineProtocolEncoderTest, WithEmptyMeasurement) +{ + EXPECT_THROW_AS( + enc::LineProtocolEncoder{}.Encode(Point{ {}, { { "field0", 0 } } }), + errc::LogicErrors::InvalidArgument); +} + +TEST(LineProtocolEncoderTest, WithEmptyFields) +{ + EXPECT_THROW_AS( + enc::LineProtocolEncoder{}.Encode(Point{ "test_measurement", {} }), + errc::LogicErrors::InvalidArgument); +} + +TEST(LineProtocolEncoderTest, WithMultiFields) +{ + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode({ "ExampleMeasurement", + { { "field1", "val1" }, + { "field2", 2222 }, + { "field3", 0.333 }, + { "field4", true }, + { "field5", -5555 }, + { "field6", 6666ul } }, + Point::Time{ 1ns } }), + R"(ExampleMeasurement field1="val1",field2=2222i,field3=0.333,field4=T,field5=-5555i,field6=6666u 1)"); +} + +TEST(LineProtocolEncoderTest, WithoutTime) +{ + EXPECT_EQ(enc::LineProtocolEncoder{}.Encode( + { "ExampleMeasurement", { { "field1", "val1" } } }), + R"(ExampleMeasurement field1="val1")"); +} + +TEST(LineProtocolEncoderTest, WithTags) +{ + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { "ExampleMeasurement", + { { "field1", "val1" } }, + Point::Time{ 1ns }, + { { "tag1", "val1" }, { "tag2", "val2" }, { "tag3", "val3" } } }), + R"(ExampleMeasurement,tag1=val1,tag2=val2,tag3=val3 field1="val1" 1)"); +} + +TEST(LineProtocolEncoderTest, WithMultiPoints) +{ + EXPECT_EQ( + enc::LineProtocolEncoder{}.Encode( + { { "test", { { "a", 1 } }, Point::Time{ 1ns }, { { "T0", "0" } } }, + { "test,", + { { "a", 1 } }, + Point::Time{ 1ns }, + { { "T0", "0" } } }, + { "test", + { { "a", 1 } }, + Point::Time{ 1ns }, + { { "T0,", "0" } } }, + { "test", + { { "a", 1 } }, + Point::Time{ 1ns }, + { { "T0", "0," } } }, + { "test", + { { "a,", 1 } }, + Point::Time{ 1ns }, + { { "T0", "0" } } }, + { "test", + { { "a", R"(1")" } }, + Point::Time{ 1ns }, + { { "T0", "0" } } } }), + "test,T0=0 a=1i 1\ntest\\,,T0=0 a=1i 1\ntest,T0\\,=0 a=1i " + "1\ntest,T0=0\\, a=1i 1\ntest,T0=0 a\\,=1i 1\ntest,T0=0 a=\"1\\\"\" " + "1\n"); +} + +TEST(LineProtocolEncoderTest, WithMultiPointsActuallyEmpty) +{ + EXPECT_THROW_AS(enc::LineProtocolEncoder{}.Encode(std::vector{}), + errc::LogicErrors::InvalidArgument); +} + +} // namespace opengemini::test