Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Fix default allocator #1127

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,6 @@ if(NOT K2_WITH_CUDA)
set(K2_ENABLE_NVTX OFF CACHE BOOL "" FORCE)
endif()

if(NOT K2_USE_PYTORCH)
message(FATAL_ERROR "\
Please set K2_USE_PYTORCH to ON.
Support for other frameworks will be added later")
endif()

set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib")
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib")
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin")
Expand Down Expand Up @@ -286,15 +280,18 @@ endif()
list(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake)


include(pybind11)
if(K2_USE_PYTORCH)
message(STATUS "Build k2 with Pytorch.")
include(pybind11)
add_definitions(-DK2_USE_PYTORCH)
include(torch)
configure_file(
${PROJECT_SOURCE_DIR}/k2/python/k2/torch_version.py.in
${PROJECT_SOURCE_DIR}/k2/python/k2/torch_version.py @ONLY
)
message(STATUS "Generated ${PROJECT_BINARY_DIR}/torch_version.py")
else()
message(STATUS "Build k2 without Pytorch.")
endif()

if(K2_WITH_CUDA)
Expand Down
2 changes: 1 addition & 1 deletion k2/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
add_subdirectory(csrc)
add_subdirectory(python)

if(K2_USE_PYTORCH)
add_subdirectory(python)
# We use K2_TORCH_VERSION instead of TORCH_VERSION
# since TORCH_VERSION may contain something like "+cpu", "+cu113"
if(K2_TORCH_VERSION VERSION_GREATER_EQUAL 1.8 OR NOT K2_WITH_CUDA)
Expand Down
4 changes: 2 additions & 2 deletions k2/csrc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,13 @@ set(context_srcs
thread_pool.cu
timer.cu
top_sort.cu
torch_util.cu
utils.cu
nbest.cu
)


if(K2_USE_PYTORCH)
list(APPEND context_srcs pytorch_context.cu)
list(APPEND context_srcs pytorch_context.cu torch_util.cu)
else()
list(APPEND context_srcs default_context.cu)
endif()
Expand Down Expand Up @@ -166,6 +165,7 @@ if(K2_ENABLE_TESTS)
array_ops_test.cu
array_test.cu
connect_test.cu
default_context_test.cu
dtype_test.cu
fsa_algo_test.cu
fsa_test.cu
Expand Down
102 changes: 84 additions & 18 deletions k2/csrc/default_context.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@
*/

#include <cstdlib>
#include <cstring>
#include <mutex> // NOLINT

#include "k2/csrc/context.h"
#include "k2/csrc/cub.h"
#include "k2/csrc/device_guard.h"
#include "k2/csrc/log.h"
#include "k2/csrc/nvtx.h"

namespace k2 {

static constexpr std::size_t kAlignment = 64;

// TODO(haowen): most of implementations below should be updated later.
class CpuContext : public Context {
public:
CpuContext() = default;
ContextPtr GetCpuContext() override { return shared_from_this(); }
DeviceType GetDeviceType() const override { return kCpu; }

void *Allocate(std::size_t bytes, void **deleter_context) override {
Expand All @@ -52,65 +53,126 @@ class CpuContext : public Context {
void Deallocate(void *data, void * /*deleter_context*/) override {
free(data);
}

void CopyDataTo(size_t num_bytes, const void *src, ContextPtr dst_context,
void *dst) override {
DeviceType device_type = dst_context->GetDeviceType();
switch (device_type) {
case kCpu:
memcpy(dst, src, num_bytes);
break;
case kCuda: {
// CPU -> CUDA
DeviceGuard guard(dst_context);
ContextPtr pinned_context = GetPinnedContext();
auto region = NewRegion(pinned_context, num_bytes);
memcpy(region->data, src, num_bytes);
pinned_context->CopyDataTo(num_bytes, region->data, dst_context, dst);
break;
}
default:
K2_LOG(FATAL) << "Unsupported device type: " << device_type;
break;
}
}
};

class CudaContext : public Context {
public:
explicit CudaContext(int32_t gpu_id) : gpu_id_(gpu_id) {
if (gpu_id_ != -1) {
#ifdef K2_WITH_CUDA
if (gpu_id != -1) {
auto ret = cudaSetDevice(gpu_id_);
K2_CHECK_CUDA_ERROR(ret);
} else {
int current_gpu_id;
auto ret = cudaGetDevice(&current_gpu_id);
K2_CHECK_CUDA_ERROR(ret);
gpu_id_ = current_gpu_id;
}
// TODO(haowen): choose one from available GPUs if gpu_id == -1?
// and handle GPU ids from multiple machines.
auto ret = cudaStreamCreate(&stream_);
K2_CHECK_CUDA_ERROR(ret);
allocator_ = new cub::CachingDeviceAllocator();
#else
K2_LOG(FATAL) << "Unreachable code.";
#endif
}
ContextPtr GetCpuContext() override { return k2::GetCpuContext(); }
DeviceType GetDeviceType() const override { return kCuda; }
int32_t GetDeviceId() const override { return gpu_id_; }

void *Allocate(std::size_t bytes, void **deleter_context) override {
void *p = nullptr;
if (bytes) {
auto ret = cudaMalloc(&p, bytes);
K2_CHECK_CUDA_ERROR(ret);
}
#ifdef K2_WITH_CUDA
DeviceGuard guard(gpu_id_);
// the default stream is 0
auto ret = allocator_->DeviceAllocate(&p, bytes);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, the allocator allocates memory with default stream 0, we can pass a stream to it as well.

K2_CHECK_CUDA_ERROR(ret);
if (deleter_context != nullptr) *deleter_context = nullptr;
#endif
return p;
}

void CopyDataTo(size_t num_bytes, const void *src, ContextPtr dst_context,
void *dst) override {
DeviceType device_type = dst_context->GetDeviceType();
switch (device_type) {
case kCpu: {
cudaError_t ret =
cudaMemcpy(dst, src, num_bytes, cudaMemcpyDeviceToHost);
K2_CHECK_CUDA_ERROR(ret);
break;
}
case kCuda: {
DeviceGuard guard(dst_context);
cudaError_t ret =
cudaMemcpyAsync(dst, src, num_bytes, cudaMemcpyDeviceToDevice,
dst_context->GetCudaStream());
K2_CHECK_CUDA_ERROR(ret);
break;
}
default:
K2_LOG(FATAL) << "Unsupported device type: " << device_type;
break;
}
}

bool IsCompatible(const Context &other) const override {
return other.GetDeviceType() == kCuda && other.GetDeviceId() == gpu_id_;
}

void Deallocate(void *data, void * /*deleter_context*/) override {
auto ret = cudaFree(data);
#ifdef K2_WITH_CUDA
DeviceGuard guard(gpu_id_);
auto ret = allocator_->DeviceFree(data);
K2_CHECK_CUDA_ERROR(ret);
#endif
}

cudaStream_t GetCudaStream() const override {
return g_stream_override.OverrideStream(stream_);
#ifdef K2_WITH_CUDA
return g_stream_override.OverrideStream(0);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, in Pytorch_Context.cu, we get the stream by c10::cuda::getCurrentCudaStream, I read the source code of Pytorch, if we don't setCurrentCudaStream the return stream is always the default stream (i.e. stream 0).

So, can we use the default stream 0 here? (0 might not be good here, we can add a macro some like kDefaultStream)

#else
return kCudaStreamInvalid;
#endif
}

void Sync() const override {
auto ret = cudaStreamSynchronize(stream_);
DeviceGuard guard(gpu_id_);
auto ret = cudaStreamSynchronize(GetCudaStream());
K2_CHECK_CUDA_ERROR(ret);
}

~CudaContext() {
auto ret = cudaStreamDestroy(stream_);
K2_CHECK_CUDA_ERROR(ret);
delete allocator_;
}

private:
int32_t gpu_id_;
cudaStream_t stream_;
cub::CachingDeviceAllocator* allocator_;
};

ContextPtr GetCpuContext() { return std::make_shared<CpuContext>(); }

ContextPtr GetCudaContext(int32_t gpu_id /*= -1*/) {
#ifdef K2_WITH_CUDA
static std::once_flag has_cuda_init_flag;
static bool has_cuda = false;
std::call_once(has_cuda_init_flag, []() {
Expand All @@ -122,9 +184,13 @@ ContextPtr GetCudaContext(int32_t gpu_id /*= -1*/) {
K2_LOG(WARNING) << "CUDA is not available. Return a CPU context.";
});

DeviceGuard guard(gpu_id);
if (has_cuda) return std::make_shared<CudaContext>(gpu_id);

return GetCpuContext();
#else
return GetCpuContext();
#endif
}

} // namespace k2
96 changes: 96 additions & 0 deletions k2/csrc/default_context_test.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Copyright 2022 Xiaomi Corporation (authors: Wei Kang)
*
* See LICENSE for clarification regarding multiple authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "gtest/gtest.h"
#include "k2/csrc/test_utils.h"
//
#include "k2/csrc/array.h"
#include "k2/csrc/device_guard.h"
#include "k2/csrc/context.h"

namespace k2 {

// Use a separate function because there is a lambda function inside K2_EVAL().
static void TestImpl() {
int num_devices;
auto ret = cudaGetDeviceCount(&num_devices);
K2_LOG(INFO) << "Number of devices: " << num_devices;

// Set the default device to 1
ret = cudaSetDevice(1);
K2_CHECK_CUDA_ERROR(ret);

int current_device;
ret = cudaGetDevice(&current_device);
K2_CHECK_CUDA_ERROR(ret);
EXPECT_EQ(current_device, 1);

ContextPtr c = GetCudaContext(0);
EXPECT_EQ(c->GetDeviceId(), 0);

// Test zero byte allocation.
{
std::vector<int32_t> data;
Array1<int32_t> src(c, data);
EXPECT_EQ(src.Dim(), 0);
}

// the default device should still be 1
ret = cudaGetDevice(&current_device);
K2_CHECK_CUDA_ERROR(ret);
EXPECT_EQ(current_device, 1);

Array1<int32_t> a(c, "[1 2]");
EXPECT_EQ(a.Context()->GetDeviceId(), 0);

// b uses the default device, which is 1
Array1<int32_t> b(GetCudaContext(), "[10 20]");
EXPECT_EQ(b.Context()->GetDeviceId(), 1);

int32_t *a_data = a.Data();
int32_t *b_data = b.Data();

{
DeviceGuard guard(0);
// a is on device 0
K2_EVAL(
a.Context(), 2, set_a, (int32_t i)->void { a_data[i] += 1; });
CheckArrayData(a, {2, 3});
}

{
DeviceGuard guard(1);
// b is on device 1
K2_EVAL(
b.Context(), 2, set_b, (int32_t i)->void { b_data[i] += 2; });

CheckArrayData(b, {12, 22});
}
}


TEST(DefaultContext, GetCudaContext) {
// skip this test is CUDA is not available
int n;
auto ret = cudaGetDeviceCount(&n);
if (ret == cudaSuccess && n > 1) {
TestImpl();
}
}

} // namespace k2
2 changes: 1 addition & 1 deletion k2/csrc/pytorch_context.cu
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class PytorchCudaContext : public Context {
return g_stream_override.OverrideStream(
c10::cuda::getCurrentCUDAStream(gpu_id_));
#else
return cudaStream_t{};
return kCudaStreamInvalid;
#endif
}

Expand Down