Skip to content

Commit

Permalink
Move merge/merge-sort device patterns into separate files (oneapi-src…
Browse files Browse the repository at this point in the history
…#1732)

Signed-off-by: Dmitriy Sobolev <[email protected]>
  • Loading branch information
dmitriy-sobolev authored Jul 26, 2024
1 parent 410161c commit 80cd2c4
Show file tree
Hide file tree
Showing 3 changed files with 428 additions and 349 deletions.
351 changes: 2 additions & 349 deletions include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include "sycl_defs.h"
#include "parallel_backend_sycl_utils.h"
#include "parallel_backend_sycl_reduce.h"
#include "parallel_backend_sycl_merge.h"
#include "parallel_backend_sycl_merge_sort.h"
#include "execution_sycl_defs.h"
#include "sycl_iterator.h"
#include "unseq_backend_sycl.h"
Expand Down Expand Up @@ -199,15 +201,6 @@ class __find_or_kernel;
template <typename... _Name>
class __scan_propagate_kernel;

template <typename... _Name>
class __sort_leaf_kernel;

template <typename... _Name>
class __sort_global_kernel;

template <typename... _Name>
class __sort_copy_back_kernel;

template <typename... _Name>
class __scan_single_wg_kernel;

Expand Down Expand Up @@ -1415,346 +1408,6 @@ struct __partial_merge_kernel
}
};

//Searching for an intersection of a merge matrix (n1, n2) diagonal with the Merge Path to define sub-ranges
//to serial merge. For example, a merge matrix for [0,1,1,2,3] and [0,0,2,3] is shown below:
// 0 1 1 2 3
// ------------------
// |--->
// 0 | 0 | 1 1 1 1
// | |
// 0 | 0 | 1 1 1 1
// | ---------->
// 2 | 0 0 0 0 | 1
// | ---->
// 3 | 0 0 0 0 0 |
template <typename _Rng1, typename _Rng2, typename _Index, typename _Index1, typename _Index2, typename _Compare>
auto
__find_start_point(const _Rng1& __rng1, const _Rng2& __rng2, _Index __i_elem, _Index1 __n1, _Index2 __n2,
_Compare __comp)
{
_Index1 __start1 = 0;
_Index2 __start2 = 0;
if (__i_elem < __n2) //a condition to specify upper or lower part of the merge matrix to be processed
{
auto __q = __i_elem; //diagonal index
auto __n_diag = ::std::min<_Index2>(__q, __n1); //diagonal size

//searching for the first '1', a lower bound for a diagonal [0, 0,..., 0, 1, 1,.... 1, 1]
oneapi::dpl::counting_iterator<_Index> __diag_it(0);
auto __res = ::std::lower_bound(__diag_it, __diag_it + __n_diag, 1/*value to find*/,
[&__rng2, &__rng1, __q, __comp](const auto& __i_diag, const auto& __value) mutable
{
auto __zero_or_one = __comp(__rng2[__q - __i_diag - 1], __rng1[__i_diag]);
return __zero_or_one < __value;
});
__start1 = *__res;
__start2 = __q - *__res;
}
else
{
auto __q = __i_elem - __n2; //diagonal index
auto __n_diag = ::std::min<_Index1>(__n1 - __q, __n2); //diagonal size

//searching for the first '1', a lower bound for a diagonal [0, 0,..., 0, 1, 1,.... 1, 1]
oneapi::dpl::counting_iterator<_Index> __diag_it(0);
auto __res = ::std::lower_bound(__diag_it, __diag_it + __n_diag, 1/*value to find*/,
[&__rng2, &__rng1, __n2, __q, __comp](const auto& __i_diag, const auto& __value) mutable
{
auto __zero_or_one = __comp(__rng2[__n2 - __i_diag - 1], __rng1[__q + __i_diag]);
return __zero_or_one < __value;
});

__start1 = __q + *__res;
__start2 = __n2 - *__res;
}
return std::make_pair(__start1, __start2);
}

// Do serial merge of the data from rng1 (starting from start1) and rng2 (starting from start2) and writing
// to rng3 (starting from start3) in 'chunk' steps, but do not exceed the total size of the sequences (n1 and n2)
template <typename _Rng1, typename _Rng2, typename _Rng3, typename _Index1, typename _Index2, typename _Index3,
typename _Size1, typename _Size2, typename _Compare>
void
__serial_merge(const _Rng1& __rng1, const _Rng2& __rng2, _Rng3& __rng3, _Index1 __start1, _Index2 __start2,
_Index3 __start3, ::std::uint8_t __chunk, _Size1 __n1, _Size2 __n2, _Compare __comp)
{
if (__start1 >= __n1)
{
//copying a residual of the second seq
const auto __n = ::std::min<_Index2>(__n2 - __start2, __chunk);
for (::std::uint8_t __i = 0; __i < __n; ++__i)
__rng3[__start3 + __i] = __rng2[__start2 + __i];
}
else if (__start2 >= __n2)
{
//copying a residual of the first seq
const auto __n = ::std::min<_Index1>(__n1 - __start1, __chunk);
for (::std::uint8_t __i = 0; __i < __n; ++__i)
__rng3[__start3 + __i] = __rng1[__start1 + __i];
}
else
{
::std::uint8_t __n = __chunk;
for (::std::uint8_t __i = 0; __i < __n && __start1 < __n1 && __start2 < __n2; ++__i)
{
const auto& __val1 = __rng1[__start1];
const auto& __val2 = __rng2[__start2];
if (__comp(__val2, __val1))
{
__rng3[__start3 + __i] = __val2;
if (++__start2 == __n2)
{
//copying a residual of the first seq
for (++__i; __i < __n && __start1 < __n1; ++__i, ++__start1)
__rng3[__start3 + __i] = __rng1[__start1];
}
}
else
{
__rng3[__start3 + __i] = __val1;
if (++__start1 == __n1)
{
//copying a residual of the second seq
for (++__i; __i < __n && __start2 < __n2; ++__i, ++__start2)
__rng3[__start3 + __i] = __rng2[__start2];
}
}
}
}
}

// Please see the comment for __parallel_for_submitter for optional kernel name explanation
template <typename _IdType, typename _Name>
struct __parallel_merge_submitter;

template <typename _IdType, typename... _Name>
struct __parallel_merge_submitter<_IdType, __internal::__optional_kernel_name<_Name...>>
{
template <typename _ExecutionPolicy, typename _Range1, typename _Range2, typename _Range3, typename _Compare>
auto
operator()(_ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _Range3&& __rng3, _Compare __comp) const
{
using _Size = oneapi::dpl::__internal::__difference_t<_Range3>;

auto __n1 = __rng1.size();
auto __n2 = __rng2.size();

assert(__n1 > 0 || __n2 > 0);

_PRINT_INFO_IN_DEBUG_MODE(__exec);

const ::std::uint8_t __chunk = __exec.queue().get_device().is_cpu() ? 128 : 4;
const _Size __n = __n1 + __n2;
const _Size __steps = oneapi::dpl::__internal::__dpl_ceiling_div(__n, __chunk);

auto __event = __exec.queue().submit([&](sycl::handler& __cgh) {
oneapi::dpl::__ranges::__require_access(__cgh, __rng1, __rng2, __rng3);
__cgh.parallel_for<_Name...>(sycl::range</*dim=*/1>(__steps), [=](sycl::item</*dim=*/1> __item_id) {

const _IdType __i_elem = __item_id.get_linear_id() * __chunk;
const auto __start = __find_start_point(__rng1, __rng2, __i_elem, __n1, __n2, __comp);
__serial_merge(__rng1, __rng2, __rng3, __start.first, __start.second, __i_elem, __chunk, __n1, __n2,
__comp);
});
});
return __future(__event);
}
};

template <typename... _Name>
class __merge_kernel_name;

template <typename _ExecutionPolicy, typename _Range1, typename _Range2, typename _Range3, typename _Compare>
auto
__parallel_merge(oneapi::dpl::__internal::__device_backend_tag, _ExecutionPolicy&& __exec, _Range1&& __rng1,
_Range2&& __rng2, _Range3&& __rng3, _Compare __comp)
{
using _CustomName = oneapi::dpl::__internal::__policy_kernel_name<_ExecutionPolicy>;

const auto __n = __rng1.size() + __rng2.size();
if (__n <= std::numeric_limits<::std::uint32_t>::max())
{
using _wi_index_type = ::std::uint32_t;
using _MergeKernel = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider<
__merge_kernel_name<_CustomName, _wi_index_type>>;
return __parallel_merge_submitter<_wi_index_type, _MergeKernel>()(
::std::forward<_ExecutionPolicy>(__exec), ::std::forward<_Range1>(__rng1), ::std::forward<_Range2>(__rng2),
::std::forward<_Range3>(__rng3), __comp);
}
else
{
using _wi_index_type = ::std::uint64_t;
using _MergeKernel = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider<
__merge_kernel_name<_CustomName, _wi_index_type>>;
return __parallel_merge_submitter<_wi_index_type, _MergeKernel>()(
::std::forward<_ExecutionPolicy>(__exec), ::std::forward<_Range1>(__rng1), ::std::forward<_Range2>(__rng2),
::std::forward<_Range3>(__rng3), __comp);
}
}

//-----------------------------------------------------------------------
// parallel_sort: general implementation
//-----------------------------------------------------------------------
struct __leaf_sort_kernel
{
template <typename _Acc, typename _Size1, typename _Compare>
void
operator()(const _Acc& __acc, const _Size1 __start, const _Size1 __end, _Compare __comp) const
{
for (_Size1 i = __start; i < __end; ++i)
{
for (_Size1 j = __start + 1; j < __start + __end - i; ++j)
{
// forwarding references allow binding of internal tuple of references with rvalue
auto&& __first_item = __acc[j - 1];
auto&& __second_item = __acc[j];
if (__comp(__second_item, __first_item))
{
using ::std::swap;
swap(__first_item, __second_item);
}
}
}
}
};

// Please see the comment for __parallel_for_submitter for optional kernel name explanation
template <typename _IdType, typename _LeafSortName, typename _GlobalSortName, typename _CopyBackName>
struct __parallel_sort_submitter;

template <typename _IdType, typename... _LeafSortName, typename... _GlobalSortName, typename... _CopyBackName>
struct __parallel_sort_submitter<_IdType, __internal::__optional_kernel_name<_LeafSortName...>,
__internal::__optional_kernel_name<_GlobalSortName...>,
__internal::__optional_kernel_name<_CopyBackName...>>
{
template <typename _BackendTag, typename _ExecutionPolicy, typename _Range, typename _Compare>
auto
operator()(_BackendTag, _ExecutionPolicy&& __exec, _Range&& __rng, _Compare __comp) const
{
using _Tp = oneapi::dpl::__internal::__value_t<_Range>;
using _Size = oneapi::dpl::__internal::__difference_t<_Range>;

const ::std::size_t __n = __rng.size();
assert(__n > 1);

const bool __is_cpu = __exec.queue().get_device().is_cpu();
const ::std::uint32_t __leaf = __is_cpu ? 16 : 4;
_Size __steps = oneapi::dpl::__internal::__dpl_ceiling_div(__n, __leaf);

// 1. Perform sorting of the leaves of the merge sort tree
sycl::event __event1 = __exec.queue().submit([&](sycl::handler& __cgh) {
oneapi::dpl::__ranges::__require_access(__cgh, __rng);
__cgh.parallel_for<_LeafSortName...>(sycl::range</*dim=*/1>(__steps), [=](sycl::item</*dim=*/1> __item_id)
{
const _IdType __i_elem = __item_id.get_linear_id() * __leaf;
__leaf_sort_kernel()(__rng, __i_elem, std::min<_IdType>(__i_elem + __leaf, __n), __comp);
});
});

// 2. Merge sorting
oneapi::dpl::__par_backend_hetero::__buffer<_ExecutionPolicy, _Tp> __temp_buf(__exec, __n);
auto __temp = __temp_buf.get_buffer();
bool __data_in_temp = false;
_IdType __n_sorted = __leaf;
const ::std::uint32_t __chunk = __is_cpu ? 32 : 4;
__steps = oneapi::dpl::__internal::__dpl_ceiling_div(__n, __chunk);

const ::std::size_t __n_power2 = oneapi::dpl::__internal::__dpl_bit_ceil(__n);
const ::std::int64_t __n_iter = ::std::log2(__n_power2) - ::std::log2(__leaf);
for (::std::int64_t __i = 0; __i < __n_iter; ++__i)
{
__event1 = __exec.queue().submit([&, __n_sorted, __data_in_temp](sycl::handler& __cgh) {
__cgh.depends_on(__event1);

oneapi::dpl::__ranges::__require_access(__cgh, __rng);
sycl::accessor __dst(__temp, __cgh, sycl::read_write, sycl::no_init);

__cgh.parallel_for<_GlobalSortName...>(sycl::range</*dim=*/1>(__steps), [=](sycl::item</*dim=*/1> __item_id)
{
const _IdType __i_elem = __item_id.get_linear_id() * __chunk;
const auto __i_elem_local = __i_elem % (__n_sorted * 2);

const auto __offset = ::std::min<_IdType>((__i_elem / (__n_sorted * 2)) * (__n_sorted * 2), __n);
const auto __n1 = ::std::min<_IdType>(__offset + __n_sorted, __n) - __offset;
const auto __n2 = ::std::min<_IdType>(__offset + __n1 + __n_sorted, __n) - (__offset + __n1);

if (__data_in_temp)
{
const auto& __rng1 = oneapi::dpl::__ranges::drop_view_simple(__dst, __offset);
const auto& __rng2 = oneapi::dpl::__ranges::drop_view_simple(__dst, __offset + __n1);

const auto start = __find_start_point(__rng1, __rng2, __i_elem_local, __n1, __n2, __comp);
__serial_merge(__rng1, __rng2, __rng/*__rng3*/, start.first, start.second, __i_elem, __chunk, __n1, __n2, __comp);
}
else
{
const auto& __rng1 = oneapi::dpl::__ranges::drop_view_simple(__rng, __offset);
const auto& __rng2 = oneapi::dpl::__ranges::drop_view_simple(__rng, __offset + __n1);

const auto start = __find_start_point(__rng1, __rng2, __i_elem_local, __n1, __n2, __comp);
__serial_merge(__rng1, __rng2, __dst/*__rng3*/, start.first, start.second, __i_elem, __chunk, __n1, __n2, __comp);
}
});
});
__n_sorted *= 2;
__data_in_temp = !__data_in_temp;
}

// 3. If the data remained in the temporary buffer then copy it back
if (__data_in_temp)
{
__event1 = __exec.queue().submit([&](sycl::handler& __cgh) {
__cgh.depends_on(__event1);
oneapi::dpl::__ranges::__require_access(__cgh, __rng);
auto __temp_acc = __temp.template get_access<access_mode::read>(__cgh);
// We cannot use __cgh.copy here because of zip_iterator usage
__cgh.parallel_for<_CopyBackName...>(sycl::range</*dim=*/1>(__n), [=](sycl::item</*dim=*/1> __item_id) {
const _IdType __idx = __item_id.get_linear_id();
__rng[__idx] = __temp_acc[__idx];
});
});
}

return __future(__event1);
}
};

template <typename _ExecutionPolicy, typename _Range, typename _Compare>
auto
__parallel_sort_impl(oneapi::dpl::__internal::__device_backend_tag, _ExecutionPolicy&& __exec, _Range&& __rng,
_Compare __comp)
{
using _CustomName = oneapi::dpl::__internal::__policy_kernel_name<_ExecutionPolicy>;

const auto __n = __rng.size();
if (__n <= std::numeric_limits<::std::uint32_t>::max())
{
using _wi_index_type = ::std::uint32_t;
using _LeafSortKernel =
oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider<__sort_leaf_kernel<_CustomName, _wi_index_type>>;
using _GlobalSortKernel =
oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider<__sort_global_kernel<_CustomName, _wi_index_type>>;
using _CopyBackKernel =
oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider<__sort_copy_back_kernel<_CustomName, _wi_index_type>>;
return __parallel_sort_submitter<_wi_index_type, _LeafSortKernel, _GlobalSortKernel, _CopyBackKernel>()(
oneapi::dpl::__internal::__device_backend_tag{}, ::std::forward<_ExecutionPolicy>(__exec),
::std::forward<_Range>(__rng), __comp);
}
else
{
using _wi_index_type = ::std::uint64_t;
using _LeafSortKernel =
oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider<__sort_leaf_kernel<_CustomName, _wi_index_type>>;
using _GlobalSortKernel =
oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider<__sort_global_kernel<_CustomName, _wi_index_type>>;
using _CopyBackKernel =
oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider<__sort_copy_back_kernel<_CustomName, _wi_index_type>>;
return __parallel_sort_submitter<_wi_index_type, _LeafSortKernel, _GlobalSortKernel, _CopyBackKernel>()(
oneapi::dpl::__internal::__device_backend_tag{}, ::std::forward<_ExecutionPolicy>(__exec),
::std::forward<_Range>(__rng), __comp);
}
}

// Please see the comment for __parallel_for_submitter for optional kernel name explanation
template <typename _GlobalSortName, typename _CopyBackName>
struct __parallel_partial_sort_submitter;
Expand Down
Loading

0 comments on commit 80cd2c4

Please sign in to comment.