Skip to content

Commit

Permalink
feat: support write method
Browse files Browse the repository at this point in the history
Signed-off-by: ik <[email protected]>
  • Loading branch information
hiiiik committed Aug 11, 2024
1 parent 02e1555 commit e441e5a
Show file tree
Hide file tree
Showing 15 changed files with 935 additions and 0 deletions.
1 change: 1 addition & 0 deletions include/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ else()
opengemini/impl/ClientConfigBuilder.cpp
opengemini/impl/ErrorCode.cpp
opengemini/impl/comm/Context.cpp
opengemini/impl/enc/LineProtocolEncoder.cpp
opengemini/impl/http/IHttpClient.cpp
opengemini/impl/http/HttpClient.cpp
opengemini/impl/http/HttpsClient.cpp
Expand Down
83 changes: 83 additions & 0 deletions include/opengemini/Client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "opengemini/ClientConfig.hpp"
#include "opengemini/CompletionToken.hpp"
#include "opengemini/Point.hpp"
#include "opengemini/Query.hpp"
#include "opengemini/RetentionPolicy.hpp"

Expand Down Expand Up @@ -357,6 +358,88 @@ class Client {
std::string_view retentionPolicy,
COMPLETION_TOKEN&& token = {});

///
/// \~English
/// @brief Write a point.
/// @param database Name of the database.
/// @param point Single point as @ref Point .
/// @param retentionPolicy Name of the retention policy, default to empty
/// string (no retention policy is specified).
/// @param token The completion token which will be invoked when the task
/// complete. Default to @ref token::sync if this parameter is not
/// specified. If passing function object as token, the function signature
/// must be:
/// @code
/// void (
/// // Result of operation, it means success if the value is nullptr,
/// // otherwise, contains an exception.
/// std::exception_ptr error
/// )
/// @endcode
///
/// \~Chinese
/// @brief 写入一个点位。
/// @param database 数据库名称。
/// @param point 单个点位@ref Point 。
/// @param retentionPolicy
/// 保留策略名称,默认值为空字符串(即不指定保留策略)。
/// @param token 任务完成令牌,将在任务完成后被调用。
/// 若没有指定该参数,则使用默认值:@ref token::sync 。
/// 若传递函数对象作为完成令牌,则其签名必须满足:
/// @code
/// void (
/// // 执行结果,仅当值为nullptr时代表成功,否则将承载相关的异常。
/// std::exception_ptr error
/// )
/// @endcode
///
template<typename COMPLETION_TOKEN = token::Sync>
[[nodiscard]] auto Write(std::string_view database,
Point point,
std::string_view retentionPolicy = {},
COMPLETION_TOKEN&& token = {});

///
/// \~English
/// @brief Write multiple points.
/// @param database Name of the database.
/// @param points A vector of points.
/// @param retentionPolicy Name of the retention policy, default to empty
/// string (no retention policy is specified).
/// @param token The completion token which will be invoked when the task
/// complete. Default to @ref token::sync if this parameter is not
/// specified. If passing function object as token, the function signature
/// must be:
/// @code
/// void (
/// // Result of operation, it means success if the value is nullptr,
/// // otherwise, contains an exception.
/// std::exception_ptr error
/// )
/// @endcode
///
/// \~Chinese
/// @brief 写入多个点位。
/// @param database 数据库名称。
/// @param points 点位数组。
/// @param retentionPolicy
/// 保留策略名称,默认值为空字符串(即不指定保留策略)。
/// @param token 任务完成令牌,将在任务完成后被调用。
/// 若没有指定该参数,则使用默认值:@ref token::sync 。
/// 若传递函数对象作为完成令牌,则其签名必须满足:
/// @code
/// void (
/// // 执行结果,仅当值为nullptr时代表成功,否则将承载相关的异常。
/// std::exception_ptr error
/// )
/// @endcode
///
template<typename COMPLETION_TOKEN = token::Sync>
[[nodiscard]] auto Write(std::string_view database,
std::vector<Point> points,
std::string_view retentionPolicy = {},
COMPLETION_TOKEN&& token = {});

private:
Client(const Client&) = delete;
Client& operator=(const Client&) = delete;
Expand Down
50 changes: 50 additions & 0 deletions include/opengemini/Point.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//
// Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#ifndef OPENGEMINI_POINT_HPP
#define OPENGEMINI_POINT_HPP

#include <chrono>
#include <map>
#include <string>
#include <variant>

#include "opengemini/Precision.hpp"

namespace opengemini {

///
/// \~English
/// @brief Holds the point data.
///
/// \~Chinese
/// @brief 点位数据。
///
struct Point {
using Field = std::variant<double, int64_t, uint64_t, std::string, bool>;
using Time = std::chrono::time_point<std::chrono::system_clock,
std::chrono::nanoseconds>;

std::string measurement;
std::map<std::string, Field> fields;
Time time;
std::map<std::string, std::string> tags;
Precision precision{ Precision::Nanosecond };
};

} // namespace opengemini

#endif // !OPENGEMINI_POINT_HPP
24 changes: 24 additions & 0 deletions include/opengemini/impl/Client.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,28 @@ auto Client::DropRetentionPolicy(std::string_view database,
std::forward<COMPLETION_TOKEN>(token));
}

template<typename COMPLETION_TOKEN>
auto Client::Write(std::string_view database,
Point point,
std::string_view retentionPolicy,
COMPLETION_TOKEN&& token)
{
return impl_->Write(database,
std::move(point),
retentionPolicy,
std::forward<COMPLETION_TOKEN>(token));
}

template<typename COMPLETION_TOKEN>
auto Client::Write(std::string_view database,
std::vector<Point> points,
std::string_view retentionPolicy,
COMPLETION_TOKEN&& token)
{
return impl_->Write(database,
std::move(points),
retentionPolicy,
std::forward<COMPLETION_TOKEN>(token));
}

} // namespace opengemini
6 changes: 6 additions & 0 deletions include/opengemini/impl/ClientImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ class ClientImpl {
std::string_view retentionPolicy,
COMPLETION_TOKEN&& token);

template<typename COMPLETION_TOKEN, typename POINT_TYPE>
auto Write(std::string_view database,
POINT_TYPE point,
std::string_view retentionPolicy,
COMPLETION_TOKEN&& token);

private:
std::shared_ptr<http::IHttpClient>
ConstructHttpClient(const ClientConfig& config);
Expand Down
31 changes: 31 additions & 0 deletions include/opengemini/impl/ClientImpl.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,35 @@ auto ClientImpl::DropRetentionPolicy(std::string_view database,
std::string(retentionPolicy));
}

template<typename COMPLETION_TOKEN, typename POINT_TYPE>
auto ClientImpl::Write(std::string_view database,
POINT_TYPE point,
std::string_view retentionPolicy,
COMPLETION_TOKEN&& token)
{
using Signature = sig::Write;
return boost::asio::async_initiate<COMPLETION_TOKEN, Signature>(
[this](auto&& token,
std::string database,
std::string retentionPolicy,
POINT_TYPE point) {
static_assert(util::IsInvocable_v<decltype(token), Signature>,
"Completion signature of Write must be: "
"void(std::exception_ptr)");

Spawn<Signature>(
Functor::RunWrite<POINT_TYPE>{ this,
std::move(database),
std::move(retentionPolicy),
std::move(point) },
OPENGEMINI_PF(token));
},
token,
std::string(database),
std::string(retentionPolicy),
std::move(point));
}

template<typename COMPLETION_SIGNATURE,
typename COMPLETION_TOKEN,
typename FUNCTION,
Expand Down Expand Up @@ -228,3 +257,5 @@ void ClientImpl::Spawn(FUNCTION&& func, COMPLETION_TOKEN&& token)
}

} // namespace opengemini::impl

#include "opengemini/impl/cli/Write.tpp"
10 changes: 10 additions & 0 deletions include/opengemini/impl/cli/Functor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ struct ClientImpl::Functor {
void operator()(boost::asio::yield_context yield) const;
};

template<typename POINT_TYPE>
struct RunWrite {
ClientImpl* impl_;
std::string db_;
std::string rp_;
POINT_TYPE point_;

void operator()(boost::asio::yield_context yield) const;
};

struct RunQueryGet {
ClientImpl* impl_;
struct Query query_;
Expand Down
48 changes: 48 additions & 0 deletions include/opengemini/impl/cli/Write.tpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//
// Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#include "opengemini/impl/cli/Functor.hpp"

#include <boost/url.hpp>

#include "opengemini/Exception.hpp"
#include "opengemini/impl/comm/UrlTargets.hpp"
#include "opengemini/impl/enc/LineProtocolEncoder.hpp"

namespace opengemini::impl {

template<typename POINT_TYPE>
void ClientImpl::Functor::RunWrite<POINT_TYPE>::operator()(
boost::asio::yield_context yield) const
{
auto content = enc::LineProtocolEncoder{}.Encode(point_);

boost::url target(url::WRITE);
target.set_query(fmt::format("db={}&rp={}", db_, rp_));

auto rsp = impl_->http_->Post(impl_->lb_->PickAvailableServer(),
target.buffer(),
std::move(content),
yield);
if (rsp.result() != http::Status::no_content) {
throw Exception(errc::ServerErrors::UnexpectedStatusCode,
fmt::format("Received code: {}, body:{}",
rsp.result_int(),
rsp.body()));
}
}

} // namespace opengemini::impl
2 changes: 2 additions & 0 deletions include/opengemini/impl/comm/CompletionSignature.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ using ShowRetentionPolicies = void(std::exception_ptr,
std::vector<RetentionPolicy>);
using DropRetentionPolicy = void(std::exception_ptr);

using Write = void(std::exception_ptr);

} // namespace opengemini::impl::sig

#endif // !OPENGEMINI_IMPL_COMM_COMPLETIONSIGNATURE_HPP
1 change: 1 addition & 0 deletions include/opengemini/impl/comm/UrlTargets.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace opengemini::impl::url {

inline constexpr auto PING = "/ping";
inline constexpr auto QUERY = "/query";
inline constexpr auto WRITE = "/write";

} // namespace opengemini::impl::url

Expand Down
Loading

0 comments on commit e441e5a

Please sign in to comment.