Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync sdks 4 #301

Merged
merged 35 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1a775f7
Returned federated_topic tests
Gazizonoki Aug 23, 2024
8fe04eb
Moved commit "fix reordering of messages" from ydb repo
nshestakov Aug 23, 2024
27c607b
Moved commit "Added add field for pool id into gRPC api" from ydb repo
GrigoriyPA Aug 23, 2024
a995392
Moved commit "Report & show replication lag" from ydb repo
CyberROFL Aug 23, 2024
edcad84
Moved commit "Add start and end times to Operations API" from ydb repo
pixcc Aug 23, 2024
102877c
Moved commit "Extend index tables' partitioning settings to be able t…
jepett0 Aug 23, 2024
8e429b8
Moved commit "Federated write session: fix deadlock" from ydb repo
qyryq Aug 23, 2024
24f1641
Moved commit "CDC Initial Scan progress" from ydb repo
CyberROFL Aug 23, 2024
e584d37
Moved commit "PQ_V1 SDK auto partitioning support" from ydb repo
niksaveliev Aug 23, 2024
6984ae6
Moved commit "Federated Topic: Log on federation discovery failure" f…
qyryq Aug 23, 2024
c65fe0f
Moved commit "Replication stats (total & per item): lag, initial scan…
CyberROFL Aug 23, 2024
f4b2e42
Moved commit "Fix for federated topic" from ydb repo
qyryq Aug 23, 2024
6ba38eb
Moved commit "autopartitioning for kinesis" from ydb repo
nshestakov Aug 23, 2024
2fe54ce
Moved commit "Memory leak in Topic SDK" from ydb repo
Alek5andr-Kotov Aug 23, 2024
0a27948
Moved commit "Add created_by to Operations API" from ydb repo
pixcc Aug 23, 2024
5c359a3
Moved commit "Create vector index in SchemeShard" from ydb repo
azevaykin Aug 26, 2024
a21ef5d
Moved commit "ydb_topic: Fallback to "indirect" write if the server d…
qyryq Aug 26, 2024
01a4fbc
Moved commit "Add permissions to ydb tools dump" from ydb repo
pixcc Aug 26, 2024
f012456
Moved commit "YQ-3447 support ydb scheme ls for resource pools" from …
GrigoriyPA Aug 26, 2024
7a70b5c
Moved commit "Add PAUSED strategy for autopartitioning of the topic" …
nshestakov Aug 26, 2024
4b10051
Moved commit "Use 1-thread pool executor for subsession event handler…
qyryq Aug 26, 2024
8b95d53
Moved commit "ydb_topic write session: lower log level for retryable …
qyryq Aug 26, 2024
566ac7a
Moved commit "Enable/disable ssl connections, return connection_strin…
CyberROFL Aug 26, 2024
56d4674
Moved commit "Add few auto partitioning fields to describe and SDK" f…
niksaveliev Aug 26, 2024
db347eb
Moved commit "Call ByteSizeLong on each WriteRequest separately" from…
qyryq Aug 26, 2024
a56ab71
Moved commit "Move unacknowledged messages back to OriginalMessagesTo…
qyryq Aug 26, 2024
12b4b8e
Moved commit "Ignore message acks from previous subsessions" from ydb…
qyryq Aug 26, 2024
6dfdca8
Moved commit "Use a separate lock for Processor->Write calls" from yd…
qyryq Aug 26, 2024
985b61a
Moved commit "code EES_WRITTEN_IN_TX" from ydb repo
Alek5andr-Kotov Aug 26, 2024
06422e1
Moved commit "Fix TNodeRegistrationResult" from ydb repo
UgnineSirdis Aug 26, 2024
6b29678
Moved commit "Revert separate lock for Processor->Write calls" from y…
qyryq Aug 26, 2024
f48930a
Moved commit "Add tests for pqv1" from ydb repo
nshestakov Aug 26, 2024
ce4222c
Moved commit "Added new versions of RetryQuery" from ydb repo
stanislav-shchetinin Aug 26, 2024
18d21dd
Moved commit "Restore indexes from backup with the original partition…
jepett0 Aug 26, 2024
87bd3a3
Moved commit "The value of the WriteInflightSize in the main partitio…
Alek5andr-Kotov Aug 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions include/ydb-cpp-sdk/client/datastreams/datastreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,158 @@ namespace NYdb::NDataStreams::V1 {
std::string ExplicitHashDecimal;
};

enum class EAutoPartitioningStrategy: uint32_t {
Unspecified = 0,
Disabled = 1,
ScaleUp = 2,
ScaleUpAndDown = 3,
Paused = 4,
};

struct TCreateStreamSettings;
struct TUpdateStreamSettings;


template<typename TSettings>
struct TPartitioningSettingsBuilder;
template<typename TSettings>
struct TAutoPartitioningSettingsBuilder;

struct TAutoPartitioningSettings {
friend struct TAutoPartitioningSettingsBuilder<TCreateStreamSettings>;
friend struct TAutoPartitioningSettingsBuilder<TUpdateStreamSettings>;
public:
TAutoPartitioningSettings()
: Strategy_(EAutoPartitioningStrategy::Disabled)
, StabilizationWindow_(TDuration::Seconds(0))
, DownUtilizationPercent_(0)
, UpUtilizationPercent_(0) {
}
TAutoPartitioningSettings(const Ydb::DataStreams::V1::AutoPartitioningSettings& settings);
TAutoPartitioningSettings(EAutoPartitioningStrategy strategy, TDuration stabilizationWindow, uint64_t downUtilizationPercent, uint64_t upUtilizationPercent)
: Strategy_(strategy)
, StabilizationWindow_(stabilizationWindow)
, DownUtilizationPercent_(downUtilizationPercent)
, UpUtilizationPercent_(upUtilizationPercent) {}

EAutoPartitioningStrategy GetStrategy() const { return Strategy_; };
TDuration GetStabilizationWindow() const { return StabilizationWindow_; };
uint32_t GetDownUtilizationPercent() const { return DownUtilizationPercent_; };
uint32_t GetUpUtilizationPercent() const { return UpUtilizationPercent_; };
private:
EAutoPartitioningStrategy Strategy_;
TDuration StabilizationWindow_;
uint32_t DownUtilizationPercent_;
uint32_t UpUtilizationPercent_;
};


class TPartitioningSettings {
using TSelf = TPartitioningSettings;
friend struct TPartitioningSettingsBuilder<TCreateStreamSettings>;
friend struct TPartitioningSettingsBuilder<TUpdateStreamSettings>;
public:
TPartitioningSettings() : MinActivePartitions_(0), MaxActivePartitions_(0), AutoPartitioningSettings_(){}
TPartitioningSettings(const Ydb::DataStreams::V1::PartitioningSettings& settings);
TPartitioningSettings(uint64_t minActivePartitions, uint64_t maxActivePartitions, TAutoPartitioningSettings autoscalingSettings = {})
: MinActivePartitions_(minActivePartitions)
, MaxActivePartitions_(maxActivePartitions)
, AutoPartitioningSettings_(autoscalingSettings) {
}

uint64_t GetMinActivePartitions() const { return MinActivePartitions_; };
uint64_t GetMaxActivePartitions() const { return MaxActivePartitions_; };
TAutoPartitioningSettings GetAutoPartitioningSettings() const { return AutoPartitioningSettings_; };
private:
uint64_t MinActivePartitions_;
uint64_t MaxActivePartitions_;
TAutoPartitioningSettings AutoPartitioningSettings_;
};

struct TCreateStreamSettings : public NYdb::TOperationRequestSettings<TCreateStreamSettings> {
FLUENT_SETTING(uint32_t, ShardCount);
FLUENT_SETTING_OPTIONAL(uint32_t, RetentionPeriodHours);
FLUENT_SETTING_OPTIONAL(uint32_t, RetentionStorageMegabytes);
FLUENT_SETTING(uint64_t, WriteQuotaKbPerSec);
FLUENT_SETTING_OPTIONAL(EStreamMode, StreamMode);


FLUENT_SETTING_OPTIONAL(TPartitioningSettings, PartitioningSettings);
TPartitioningSettingsBuilder<TCreateStreamSettings> BeginConfigurePartitioningSettings();
};

template<typename TSettings>
struct TAutoPartitioningSettingsBuilder {
using TSelf = TAutoPartitioningSettingsBuilder<TSettings>;
public:
TAutoPartitioningSettingsBuilder(TPartitioningSettingsBuilder<TSettings>& parent, TAutoPartitioningSettings& settings): Parent_(parent), Settings_(settings) {}

TSelf Strategy(EAutoPartitioningStrategy value) {
Settings_.Strategy_ = value;
return *this;
}

TSelf StabilizationWindow(TDuration value) {
Settings_.StabilizationWindow_ = value;
return *this;
}

TSelf DownUtilizationPercent(uint32_t value) {
Settings_.DownUtilizationPercent_ = value;
return *this;
}

TSelf UpUtilizationPercent(uint32_t value) {
Settings_.UpUtilizationPercent_ = value;
return *this;
}

TPartitioningSettingsBuilder<TSettings>& EndConfigureAutoPartitioningSettings() {
return Parent_;
}

private:
TPartitioningSettingsBuilder<TSettings>& Parent_;
TAutoPartitioningSettings& Settings_;
};

template<typename TSettings>
struct TPartitioningSettingsBuilder {
using TSelf = TPartitioningSettingsBuilder;
public:
TPartitioningSettingsBuilder(TSettings& parent): Parent_(parent) {}

TSelf MinActivePartitions(uint64_t value) {
if (!Parent_.PartitioningSettings_.has_value()) {
Parent_.PartitioningSettings_.emplace();
}
(*Parent_.PartitioningSettings_).MinActivePartitions_ = value;
return *this;
}

TSelf MaxActivePartitions(uint64_t value) {
if (!Parent_.PartitioningSettings_.has_value()) {
Parent_.PartitioningSettings_.emplace();
}
(*Parent_.PartitioningSettings_).MaxActivePartitions_ = value;
return *this;
}

TAutoPartitioningSettingsBuilder<TSettings> BeginConfigureAutoPartitioningSettings() {
if (!Parent_.PartitioningSettings_.has_value()) {
Parent_.PartitioningSettings_.emplace();
}
return {*this, (*Parent_.PartitioningSettings_).AutoPartitioningSettings_};
}

TSettings& EndConfigurePartitioningSettings() {
return Parent_;
}

private:
TSettings& Parent_;
};

struct TListStreamsSettings : public NYdb::TOperationRequestSettings<TListStreamsSettings> {
FLUENT_SETTING(uint32_t, Limit);
FLUENT_SETTING(std::string, ExclusiveStartStreamName);
Expand Down Expand Up @@ -155,6 +300,8 @@ namespace NYdb::NDataStreams::V1 {
FLUENT_SETTING(uint64_t, WriteQuotaKbPerSec);
FLUENT_SETTING_OPTIONAL(EStreamMode, StreamMode);

FLUENT_SETTING_OPTIONAL(TPartitioningSettings, PartitioningSettings);
TPartitioningSettingsBuilder<TUpdateStreamSettings> BeginConfigurePartitioningSettings();
};
struct TPutRecordSettings : public NYdb::TOperationRequestSettings<TPutRecordSettings> {};
struct TPutRecordsSettings : public NYdb::TOperationRequestSettings<TPutRecordsSettings> {};
Expand Down
36 changes: 34 additions & 2 deletions include/ydb-cpp-sdk/client/draft/ydb_replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

#include <optional>

#include <util/datetime/base.h>

namespace Ydb::Replication {
class ConnectionParams;
class DescribeReplicationResult;
class DescribeReplicationResult_Stats;
}

namespace NYdb {
Expand All @@ -22,7 +25,11 @@ namespace NYdb::NReplication {

class TDescribeReplicationResult;
using TAsyncDescribeReplicationResult = NThreading::TFuture<TDescribeReplicationResult>;
struct TDescribeReplicationSettings: public TOperationRequestSettings<TDescribeReplicationSettings> {};

struct TDescribeReplicationSettings: public TOperationRequestSettings<TDescribeReplicationSettings> {
using TSelf = TDescribeReplicationSettings;
FLUENT_SETTING_DEFAULT(bool, IncludeStats, false);
};

struct TStaticCredentials {
std::string User;
Expand All @@ -44,6 +51,7 @@ class TConnectionParams: private TCommonClientSettings {

const std::string& GetDiscoveryEndpoint() const;
const std::string& GetDatabase() const;
bool GetEnableSsl() const;

ECredentials GetCredentials() const;
const TStaticCredentials& GetStaticCredentials() const;
Expand All @@ -56,7 +64,30 @@ class TConnectionParams: private TCommonClientSettings {
> Credentials_;
};

struct TRunningState {};
class TStats {
public:
TStats() = default;
TStats(const Ydb::Replication::DescribeReplicationResult_Stats& stats);

const std::optional<TDuration>& GetLag() const;
const std::optional<float>& GetInitialScanProgress() const;

private:
std::optional<TDuration> Lag_;
std::optional<float> InitialScanProgress_;
};

class TRunningState {
public:
TRunningState() = default;
explicit TRunningState(const TStats& stats);

const TStats& GetStats() const;

private:
TStats Stats_;
};

struct TDoneState {};

class TErrorState {
Expand All @@ -77,6 +108,7 @@ class TReplicationDescription {
uint64_t Id;
std::string SrcPath;
std::string DstPath;
TStats Stats;
std::optional<std::string> SrcChangefeedName;
};

Expand Down
25 changes: 21 additions & 4 deletions include/ydb-cpp-sdk/client/query/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ namespace NYdb {
namespace NRetry::Async {
template <typename TClient, typename TAsyncStatusType>
class TRetryContext;
}
} // namespace NRetry::Async
namespace NRetry::Sync {
template <typename TClient, typename TStatusType>
class TRetryContext;
} // namespace NRetry::Sync
}

namespace NYdb::NQuery {
Expand Down Expand Up @@ -55,10 +59,15 @@ class TSession;
class TQueryClient {
friend class TSession;
friend class NRetry::Async::TRetryContext<TQueryClient, TAsyncExecuteQueryResult>;
friend class NRetry::Async::TRetryContext<TQueryClient, TAsyncStatus>;
friend class NRetry::Sync::TRetryContext<TQueryClient, TStatus>;

public:
using TQueryFunc = std::function<TAsyncExecuteQueryResult(TSession session)>;
using TQueryWithoutSessionFunc = std::function<TAsyncExecuteQueryResult(TQueryClient& client)>;
using TQueryResultFunc = std::function<TAsyncExecuteQueryResult(TSession session)>;
using TQueryFunc = std::function<TAsyncStatus(TSession session)>;
using TQuerySyncFunc = std::function<TStatus(TSession session)>;
using TQueryWithoutSessionFunc = std::function<TAsyncStatus(TQueryClient& client)>;
using TQueryWithoutSessionSyncFunc = std::function<TStatus(TQueryClient& client)>;
using TSettings = TClientSettings;
using TSession = TSession;
using TCreateSessionSettings = TCreateSessionSettings;
Expand All @@ -79,7 +88,15 @@ class TQueryClient {
TAsyncExecuteQueryIterator StreamExecuteQuery(const std::string& query, const TTxControl& txControl,
const TParams& params, const TExecuteQuerySettings& settings = TExecuteQuerySettings());

TAsyncExecuteQueryResult RetryQuery(TQueryFunc&& queryFunc, TRetryOperationSettings settings = TRetryOperationSettings());
TAsyncExecuteQueryResult RetryQuery(TQueryResultFunc&& queryFunc, TRetryOperationSettings settings = TRetryOperationSettings());

TAsyncStatus RetryQuery(TQueryFunc&& queryFunc, TRetryOperationSettings settings = TRetryOperationSettings());

TAsyncStatus RetryQuery(TQueryWithoutSessionFunc&& queryFunc, TRetryOperationSettings settings = TRetryOperationSettings());

TStatus RetryQuery(const TQuerySyncFunc& queryFunc, TRetryOperationSettings settings = TRetryOperationSettings());

TStatus RetryQuery(const TQueryWithoutSessionSyncFunc& queryFunc, TRetryOperationSettings settings = TRetryOperationSettings());

TAsyncExecuteQueryResult RetryQuery(const std::string& query, const TTxControl& txControl,
TDuration timeout, bool isIndempotent);
Expand Down
2 changes: 2 additions & 0 deletions include/ydb-cpp-sdk/client/query/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ struct TExecuteQuerySettings : public TRequestSettings<TExecuteQuerySettings> {
FLUENT_SETTING_DEFAULT(EExecMode, ExecMode, EExecMode::Execute);
FLUENT_SETTING_DEFAULT(EStatsMode, StatsMode, EStatsMode::None);
FLUENT_SETTING_OPTIONAL(bool, ConcurrentResultSets);
FLUENT_SETTING(std::string, PoolId);
};

struct TBeginTxSettings : public TRequestSettings<TBeginTxSettings> {};
Expand All @@ -97,6 +98,7 @@ struct TExecuteScriptSettings : public TOperationRequestSettings<TExecuteScriptS
FLUENT_SETTING_DEFAULT(EExecMode, ExecMode, EExecMode::Execute);
FLUENT_SETTING_DEFAULT(EStatsMode, StatsMode, EStatsMode::None);
FLUENT_SETTING(TDuration, ResultsTtl);
FLUENT_SETTING(std::string, PoolId);
};

class TQueryContent {
Expand Down
10 changes: 9 additions & 1 deletion include/ydb-cpp-sdk/client/scheme/scheme.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ namespace Ydb {
class VirtualTimestamp;
namespace Scheme {
class Entry;
class ModifyPermissionsRequest;
class Permissions;
}
}

Expand All @@ -24,6 +26,8 @@ struct TPermissions {
{}
std::string Subject;
std::vector<std::string> PermissionNames;

void SerializeTo(::Ydb::Scheme::Permissions& proto) const;
};

enum class ESchemeEntryType : i32 {
Expand All @@ -42,7 +46,8 @@ enum class ESchemeEntryType : i32 {
Topic = 17,
ExternalTable = 18,
ExternalDataSource = 19,
View = 20
View = 20,
ResourcePool = 21,
};

struct TVirtualTimestamp {
Expand Down Expand Up @@ -77,6 +82,9 @@ struct TSchemeEntry {
TSchemeEntry(const ::Ydb::Scheme::Entry& proto);

void Out(IOutputStream& out) const;

// Fills ModifyPermissionsRequest proto from this entry
void SerializeTo(::Ydb::Scheme::ModifyPermissionsRequest& request) const;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading
Loading