Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-1657: resize pvc with nbsd-lightweight and minikube #1846

Merged
merged 11 commits into from
Sep 26, 2024
2 changes: 2 additions & 0 deletions cloud/blockstore/config/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ message TNullServiceConfig
message TLocalServiceConfig
{
optional string DataDir = 1;
// Shutdown timeout (in milliseconds).
optional uint32 ShutdownTimeout = 2;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
185 changes: 133 additions & 52 deletions cloud/blockstore/libs/service_local/service_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

#include <cloud/blockstore/config/server.pb.h>

#include <cloud/blockstore/libs/common/block_range.h>
#include <cloud/blockstore/libs/common/iovector.h>
#include <cloud/blockstore/libs/discovery/discovery.h>
#include <cloud/blockstore/libs/service/context.h>
#include <cloud/blockstore/libs/service/request_helpers.h>
#include <cloud/blockstore/libs/service/service.h>
#include <cloud/blockstore/libs/service/storage.h>
#include <cloud/blockstore/libs/service/storage_provider.h>
#include <cloud/storage/core/libs/common/error.h>
#include <cloud/storage/core/libs/common/timer.h>

#include <library/cpp/protobuf/util/pb_io.h>

Expand Down Expand Up @@ -62,21 +65,41 @@ struct TMountSession
{
const TString SessionId;
const TString ClientId;
const IStoragePtr Storage;
const TStorageAdapter StorageAdapter;

TMountSession(TString clientId, IStoragePtr storage, ui32 blockSize)
const NProto::EVolumeAccessMode AccessMode;
IStoragePtr Storage;
std::unique_ptr<TStorageAdapter> StorageAdapter;

TMountSession(
TString clientId,
NProto::EVolumeAccessMode accessMode,
IStoragePtr storage,
ui32 blockSize,
TDuration storageShutdownTimeout)
: SessionId(CreateGuidAsString())
, ClientId(std::move(clientId))
, AccessMode(accessMode)
, Storage(storage)
, StorageAdapter(
, StorageAdapter(std::make_unique<TStorageAdapter>(
std::move(storage),
antonmyagkov marked this conversation as resolved.
Show resolved Hide resolved
blockSize,
true, // normalize,
TDuration::Zero(), // maxRequestDuration
TDuration::Zero() // shutdownTimeout
)
storageShutdownTimeout))
{}

void UpdateStorage(
IStoragePtr storage,
ui32 blockSize,
TDuration storageShutdownTimeout)
{
Storage = std::move(storage);
StorageAdapter = std::make_unique<TStorageAdapter>(
Storage,
blockSize,
true, // normalize,
TDuration::Zero(), // maxRequestDuration
storageShutdownTimeout);
}
};

using TMountSessionPtr = std::shared_ptr<TMountSession>;
Expand All @@ -86,22 +109,29 @@ using TMountSessionMap = THashMap<TString, TMountSessionPtr>;

struct TMountedVolume
{
const NProto::TVolume Volume;
NProto::TVolume Volume;
TDuration StorageShutdownTimeout;

TMountSessionMap Sessions;
TMutex SessionLock;

TMountedVolume(NProto::TVolume volume)
TMountedVolume(NProto::TVolume volume, TDuration storageShutdownTimeout)
: Volume(std::move(volume))
, StorageShutdownTimeout(storageShutdownTimeout)
{}

TMountSessionPtr CreateSession(TString clientId, IStoragePtr storage)
TMountSessionPtr CreateSession(
TString clientId,
NProto::EVolumeAccessMode accessMode,
IStoragePtr storage)
{
with_lock (SessionLock) {
auto session = std::make_shared<TMountSession>(
std::move(clientId),
accessMode,
std::move(storage),
Volume.GetBlockSize());
Volume.GetBlockSize(),
StorageShutdownTimeout);

Sessions.emplace(session->SessionId, session);
return session;
Expand Down Expand Up @@ -130,6 +160,35 @@ struct TMountedVolume
return false;
}
}

void Resize(
ui64 blocksCount,
const TString& dataPath,
const IStorageProviderPtr& storageProvider)
{
with_lock (SessionLock) {
Volume.SetBlocksCount(blocksCount);
auto volume = Volume;
volume.SetDiskId(dataPath);
for (auto& [_, session]: Sessions) {
// nbsd-lightweight was implemented for test purposes and
// CreateStorage implementation is sync. It is safe to
// call CreateStorage under the lock.
auto storage = storageProvider
->CreateStorage(
volume,
session->ClientId,
session->AccessMode)
.GetValueSync();
if (storage) {
session->UpdateStorage(
std::move(storage),
volume.GetBlockSize(),
StorageShutdownTimeout);
}
}
}
}
};

using TMountedVolumePtr = std::shared_ptr<TMountedVolume>;
Expand All @@ -146,14 +205,19 @@ class TVolumeManager
{
private:
const TString DataDir;
const TDuration StorageShutdownTimeout;
const IStorageProviderPtr StorageProvider;

TMountedVolumeMap MountedVolumes;
TMutex MountLock;

public:
TVolumeManager(const TString& dataDir, IStorageProviderPtr storageProvider)
TVolumeManager(
const TString& dataDir,
TDuration storageShutdownTimeout,
IStorageProviderPtr storageProvider)
: DataDir(dataDir ? dataDir : NFs::CurrentWorkingDirectory())
, StorageShutdownTimeout(storageShutdownTimeout)
, StorageProvider(std::move(storageProvider))
{}

Expand Down Expand Up @@ -202,8 +266,19 @@ class TVolumeManager
TFileOutput out(fileMeta);
SerializeToTextFormat(volume, out);

TFile fileData(MakeDataPath(diskId), EOpenModeFlag::OpenExisting);
auto dataPath = MakeDataPath(diskId);
TFile fileData(dataPath, EOpenModeFlag::OpenExisting);
fileData.Resize(volume.GetBlockSize() * blocksCount);

auto mountedVolume = FindMountedVolume(diskId);
if (!mountedVolume) {
return;
}

mountedVolume->Resize(
blocksCount,
dataPath,
StorageProvider);
}

void DestroyVolume(const TString& diskId)
Expand Down Expand Up @@ -257,25 +332,28 @@ class TVolumeManager
NProto::TVolume volume = mountedVolume->Volume;
volume.SetDiskId(dataPath);
return StorageProvider->CreateStorage(volume, clientId, accessMode)
.Apply([=] (const auto& future) {
auto storage = future.GetValue();
if (!storage) {
.Apply(
antonmyagkov marked this conversation as resolved.
Show resolved Hide resolved
[mountedVolume, clientId, accessMode](const auto& future)
{
auto storage = future.GetValue();
if (!storage) {
NProto::TMountVolumeResponse response;
auto& error = *response.MutableError();
error.SetCode(E_FAIL);
error.SetMessage("Failed to create storage");
return response;
}

auto session = mountedVolume->CreateSession(
clientId,
accessMode,
std::move(storage));

NProto::TMountVolumeResponse response;
auto& error = *response.MutableError();
error.SetCode(E_FAIL);
error.SetMessage("Failed to create storage");
response.SetSessionId(session->SessionId);
*response.MutableVolume() = mountedVolume->Volume;
return response;
}

auto session = mountedVolume->CreateSession(
clientId,
std::move(storage));

NProto::TMountVolumeResponse response;
response.SetSessionId(session->SessionId);
*response.MutableVolume() = mountedVolume->Volume;
return response;
});
});
}

void UnmountVolume(const TString& diskId, const TString& sessionId)
Expand All @@ -302,7 +380,9 @@ class TVolumeManager
it = MountedVolumes.emplace_direct(
ctx,
diskId,
std::make_shared<TMountedVolume>(std::move(volume)));
std::make_shared<TMountedVolume>(
std::move(volume),
StorageShutdownTimeout));
}
return it->second;
}
Expand Down Expand Up @@ -356,6 +436,8 @@ struct TLocalServiceBase

////////////////////////////////////////////////////////////////////////////////

const ui32 kDefaultStorageShutdownTimeoutInMilliseconds = 60000;
antonmyagkov marked this conversation as resolved.
Show resolved Hide resolved

class TLocalService final
: public TLocalServiceBase
{
Expand All @@ -369,7 +451,13 @@ class TLocalService final
IDiscoveryServicePtr discoveryService,
IStorageProviderPtr storageProvider)
: DiscoveryService(std::move(discoveryService))
, VolumeManager(config.GetDataDir(), std::move(storageProvider))
, VolumeManager(
config.GetDataDir(),
TDuration::MilliSeconds(
config.HasShutdownTimeout()
? config.GetShutdownTimeout()
: kDefaultStorageShutdownTimeoutInMilliseconds),
std::move(storageProvider))
{}

void Start() override {}
Expand Down Expand Up @@ -669,7 +757,7 @@ TFuture<NProto::TReadBlocksResponse> TLocalService::ReadBlocks(
<< "Out of bounds read request";
}

return session->StorageAdapter.ReadBlocks(
return session->StorageAdapter->ReadBlocks(
Now(),
std::move(ctx),
std::move(request),
Expand Down Expand Up @@ -708,27 +796,20 @@ TFuture<NProto::TWriteBlocksResponse> TLocalService::WriteBlocks(
<< "Volume not mounted";
}

ui64 startIndex = request->GetStartIndex();
if (startIndex >= volume->Volume.GetBlocksCount()) {
ythrow TServiceError(E_ARGUMENT)
<< "Out of bounds write request";
}

auto sgListOrError = SgListNormalize(
GetSgList(*request),
volume->Volume.GetBlockSize());
if (HasError(sgListOrError)) {
const auto& error = sgListOrError.GetError();
ythrow TServiceError(error.GetCode()) << error.GetMessage();
}
auto sglist = sgListOrError.ExtractResult();
const auto requestRange = TBlockRange64::WithLength(
drbasic marked this conversation as resolved.
Show resolved Hide resolved
request->GetStartIndex(),
CalculateWriteRequestBlockCount(
*request,
volume->Volume.GetBlocksCount()));
bool rangeOk =
TBlockRange64::WithLength(0, volume->Volume.GetBlocksCount())
.Contains(requestRange);

if (startIndex + sglist.size() > volume->Volume.GetBlocksCount()) {
ythrow TServiceError(E_ARGUMENT)
<< "Out of bounds write request";
if (!rangeOk) {
ythrow TServiceError(E_ARGUMENT) << "Out of bounds write request";
}

return session->StorageAdapter.WriteBlocks(
return session->StorageAdapter->WriteBlocks(
Now(),
std::move(ctx),
std::move(request),
Expand Down Expand Up @@ -779,7 +860,7 @@ TFuture<NProto::TZeroBlocksResponse> TLocalService::ZeroBlocks(
<< "Out of bounds write request";
}

return session->StorageAdapter.ZeroBlocks(
return session->StorageAdapter->ZeroBlocks(
Now(),
std::move(ctx),
std::move(request),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ spec:
requests:
storage: 1Gi
limits:
storage: 1Gi
storage: 10Gi
storageClassName: nbs-csi-sc
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ metadata:
name: nbs-csi-sc
provisioner: nbs.csi.nebius.ai
volumeBindingMode: Immediate
allowVolumeExpansion: true
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,33 @@ spec:
volumeMounts:
- name: socket-dir
mountPath: /csi
- name: csi-resizer
image: registry.k8s.io/sig-storage/csi-resizer:v1.11.2
imagePullPolicy: IfNotPresent
args:
- "--v=5"
- "--csi-address=/csi/csi.sock"
- "--leader-election"
- "--http-endpoint=:8081"
env:
- name: KUBE_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
resources:
requests:
memory: "64Mi"
cpu: "100m"
limits:
memory: "128Mi"
cpu: "250m"
ports:
- containerPort: 8081
name: http-endpoint
protocol: TCP
volumeMounts:
- name: socket-dir
mountPath: /csi
- name: csi-nbs-driver
image: nbs-csi-driver:latest
imagePullPolicy: IfNotPresent
Expand Down
Loading