Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[try_put_and_wait] Part 9: Draft tricky implementation of try_put_and_wait for multifunction_node and async_node #1438

Draft
wants to merge 62 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
cef3bb8
Add wait_vertex implementation
pavelkumbrasev Apr 15, 2024
f25d898
Improve task_group scalability
pavelkumbrasev Apr 15, 2024
cda2377
Increase wait map size
pavelkumbrasev Apr 15, 2024
63ea3b6
Rename wait in task_group
pavelkumbrasev Apr 15, 2024
b3093e2
Fix compilation and copyright
pavelkumbrasev Apr 15, 2024
0313450
Drop brackets
pavelkumbrasev Apr 15, 2024
204926c
Add missed dtor and deallocation
pavelkumbrasev Apr 15, 2024
5e27eb0
Fix copyright
pavelkumbrasev Apr 15, 2024
2bce523
Fix collaborative_call_once compilation
pavelkumbrasev Apr 15, 2024
c3ea187
Add comment for release in wait_vertex
pavelkumbrasev Apr 15, 2024
69aa6ec
Make names consistent
pavelkumbrasev Apr 15, 2024
4059154
Add missed namespace qualifier
pavelkumbrasev Apr 15, 2024
f6771e4
Fix vertex names
pavelkumbrasev Apr 16, 2024
c984265
Update namespace qualifiers
pavelkumbrasev Apr 16, 2024
d352aa3
Refactor node structure. Replace nodes in parallel_for and parallel_r…
pavelkumbrasev Feb 27, 2024
1d70767
Fix constructor and copyright
pavelkumbrasev Feb 27, 2024
fc7de60
Initial impl
pavelkumbrasev Feb 27, 2024
f6b9a46
Add entry point
pavelkumbrasev Feb 27, 2024
67d8457
Improve scalability of Flow Graph
kboyarinov Feb 27, 2024
6055dc7
Cleanup and increase FG interface version
kboyarinov Feb 28, 2024
94baad0
Fix issues caused by interface version changes
kboyarinov Feb 29, 2024
a8ccbba
Align changes
kboyarinov Apr 17, 2024
e449166
Exclude changes in parallel for
kboyarinov Apr 17, 2024
dc3224c
Exclude changes in task.cpp
kboyarinov Apr 17, 2024
899bd5c
Increase interface version for multiple interfaces
kboyarinov Apr 17, 2024
b4767c6
Temporarly disable map cleanup on dispatcher dtor
kboyarinov Apr 17, 2024
aee5fb2
Fix deduction guides
kboyarinov May 1, 2024
19a86e2
Add implementation for function_node
kboyarinov Jun 11, 2024
dbe50f7
Fix non-CPF build
kboyarinov Jun 11, 2024
660646e
Add try_put_and_wait_production branch to the CI trigger list
kboyarinov Jun 11, 2024
531da5b
Fix test issues
kboyarinov Jun 13, 2024
f97c794
Fix CI issues
kboyarinov Jun 13, 2024
7afdd9d
+ more use of override keyword
kboyarinov Jun 13, 2024
da15916
vertexes->vertices, remove unnecessary iostream include
kboyarinov Jun 21, 2024
92bf552
Remove unnecessary variadics
kboyarinov Jun 25, 2024
2bb5948
Simplify cache impl
kboyarinov Jun 25, 2024
73eb513
Fix unused variable
kboyarinov Jun 25, 2024
f92503a
Add stub for continue_input
kboyarinov Jun 25, 2024
6d82b64
Fix whitespace
kboyarinov Jun 25, 2024
bc01522
Fix issues
kboyarinov Jun 25, 2024
6d6c145
Save progress
kboyarinov Jun 18, 2024
4e05f31
Add implementation for buffering nodes
kboyarinov Jun 20, 2024
842381c
Add to CI
kboyarinov Jun 20, 2024
2c011ed
Fix copyrights
kboyarinov Jun 21, 2024
67226f3
Add missed file
kboyarinov Jun 21, 2024
708d655
Add test definition for try_get with meta
kboyarinov Jun 21, 2024
ed5d05b
Fix metainfo namespace in test
kboyarinov Jun 21, 2024
811b9b9
join_node::try_get stub
kboyarinov Jun 21, 2024
703104a
Allign with base branch
kboyarinov Jun 25, 2024
a2d7187
Add implementation for multifunction and async nodes
kboyarinov Jul 2, 2024
edae460
Merge remote-tracking branch 'origin/master' into dev/kboyarinov/try_…
kboyarinov Aug 1, 2024
db69fca
Fix incorrect alignment with master
kboyarinov Aug 1, 2024
35358fd
[try_put_and_wait] Part 1: Add implementation of try_put_and_wait fea…
kboyarinov Aug 1, 2024
b24bbd4
[try_put_and_wait] Part 3: Add implementation of try_put_and_wait fea…
dnmokhov Aug 1, 2024
33b373d
[try_put_and_wait] Part 4: Add implementation of try_put_and_wait API…
kboyarinov Aug 1, 2024
012f2e0
[try_put_and_wait] Part 5: Add try_put_and_wait API for indexer_node …
kboyarinov Aug 1, 2024
5cd159a
[try_put_and_wait] Part 2: Add implementation of try_put_and_wait fea…
kboyarinov Aug 15, 2024
d67ee73
[try_put_and_wait] Part 6: Add implementation for limiter_node + rese…
kboyarinov Aug 16, 2024
b533e9d
Merge remote-tracking branch 'origin/master' into dev/kboyarinov/try_…
kboyarinov Aug 16, 2024
e387116
[try_put_and_wait] Part 7: implementation of try_put_and_wait for que…
kboyarinov Aug 19, 2024
1ce85d9
[try_put_and_wait] Part 8: Add implementation for key_matching join_n…
kboyarinov Aug 20, 2024
d03edd7
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ on:
branches: [master]

pull_request:
branches: [master]
branches: [master, dev/kboyarinov/try_put_and_wait_production]
types:
- opened
- synchronize
Expand Down
5 changes: 5 additions & 0 deletions include/oneapi/tbb/detail/_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,11 @@
#define __TBB_PREVIEW_FLOW_GRAPH_NODE_SET (TBB_PREVIEW_FLOW_GRAPH_FEATURES)
#endif

#ifndef __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
#define __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT (TBB_PREVIEW_FLOW_GRAPH_FEATURES \
|| TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT)
#endif

#if TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS
#define __TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS 1
#endif
Expand Down
66 changes: 57 additions & 9 deletions include/oneapi/tbb/detail/_flow_graph_body_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,31 +270,70 @@ class forward_task_bypass : public graph_task {
}
};

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
template <typename... Metainfo>
struct graph_task_base : std::conditional<sizeof...(Metainfo) != 0,
graph_task_with_message_waiters,
graph_task>
{};

template <typename... Metainfo>
using graph_task_base_t = typename graph_task_base<Metainfo...>::type;
#endif

//! A task that calls a node's apply_body_bypass function, passing in an input of type Input
// return the task* unless it is SUCCESSFULLY_ENQUEUED, in which case return nullptr
template< typename NodeType, typename Input >
class apply_body_task_bypass : public graph_task {
template< typename NodeType, typename Input, typename BaseTaskType = graph_task>
class apply_body_task_bypass
: public BaseTaskType
{
NodeType &my_node;
Input my_input;

using check_metainfo = std::is_same<BaseTaskType, graph_task>;
using without_metainfo = std::true_type;
using with_metainfo = std::false_type;

graph_task* call_apply_body_bypass_impl(without_metainfo) {
return my_node.apply_body_bypass(my_input
__TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* call_apply_body_bypass_impl(with_metainfo) {
return my_node.apply_body_bypass(my_input, message_metainfo{this->get_msg_wait_context_vertices()});
}
#endif

graph_task* call_apply_body_bypass() {
return call_apply_body_bypass_impl(check_metainfo{});
}

public:
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
template <typename Metainfo>
apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i,
node_priority_t node_priority, Metainfo&& metainfo )
: BaseTaskType(g, allocator, node_priority, std::forward<Metainfo>(metainfo).waiters())
, my_node(n), my_input(i) {}
#endif

apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i
, node_priority_t node_priority = no_priority
) : graph_task(g, allocator, node_priority),
my_node(n), my_input(i) {}
apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType& n, const Input& i,
node_priority_t node_priority = no_priority )
: BaseTaskType(g, allocator, node_priority), my_node(n), my_input(i) {}

d1::task* execute(d1::execution_data& ed) override {
graph_task* next_task = my_node.apply_body_bypass( my_input );
graph_task* next_task = call_apply_body_bypass();
if (SUCCESSFULLY_ENQUEUED == next_task)
next_task = nullptr;
else if (next_task)
next_task = prioritize_task(my_node.graph_reference(), *next_task);
finalize<apply_body_task_bypass>(ed);
BaseTaskType::template finalize<apply_body_task_bypass>(ed);
return next_task;
}

d1::task* cancel(d1::execution_data& ed) override {
finalize<apply_body_task_bypass>(ed);
BaseTaskType::template finalize<apply_body_task_bypass>(ed);
return nullptr;
}
};
Expand Down Expand Up @@ -343,6 +382,15 @@ class threshold_regulator<T, DecrementType,
return result;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// Intentionally ignore the metainformation
// If there are more items associated with passed metainfo to be processed
// They should be stored in the buffer before the limiter_node
graph_task* try_put_task(const DecrementType& value, const message_metainfo&) override {
return try_put_task(value);
}
#endif

graph& graph_reference() const override {
return my_node->my_graph;
}
Expand Down
119 changes: 94 additions & 25 deletions include/oneapi/tbb/detail/_flow_graph_cache_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,12 @@ class predecessor_cache : public node_cache< sender<T>, M > {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

bool get_item( output_type& v ) {
private:
bool get_item_impl( output_type& v
__TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo_ptr = nullptr) )
{

bool msg = false;
bool successful_get = false;

do {
predecessor_type *src;
Expand All @@ -113,18 +116,35 @@ class predecessor_cache : public node_cache< sender<T>, M > {
}

// Try to get from this sender
msg = src->try_get( v );
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (metainfo_ptr) {
successful_get = src->try_get( v, *metainfo_ptr );
} else
#endif
{
successful_get = src->try_get( v );
}

if (msg == false) {
if (successful_get == false) {
// Relinquish ownership of the edge
register_successor(*src, *my_owner);
} else {
// Retain ownership of the edge
this->add(*src);
}
} while ( msg == false );
return msg;
} while ( successful_get == false );
return successful_get;
}
public:
bool get_item( output_type& v ) {
return get_item_impl(v);
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool get_item( output_type& v, message_metainfo& metainfo ) {
return get_item_impl(v, &metainfo);
}
#endif

// If we are removing arcs (rf_clear_edges), call clear() rather than reset().
void reset() {
Expand Down Expand Up @@ -157,8 +177,9 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

bool try_reserve( output_type &v ) {
bool msg = false;
private:
bool try_reserve_impl( output_type &v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo) ) {
bool successful_reserve = false;

do {
predecessor_type* pred = nullptr;
Expand All @@ -172,9 +193,16 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
}

// Try to get from this sender
msg = pred->try_reserve( v );
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (metainfo) {
successful_reserve = pred->try_reserve( v, *metainfo );
} else
#endif
{
successful_reserve = pred->try_reserve( v );
}

if (msg == false) {
if (successful_reserve == false) {
typename mutex_type::scoped_lock lock(this->my_mutex);
// Relinquish ownership of the edge
register_successor( *pred, *this->my_owner );
Expand All @@ -183,11 +211,21 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > {
// Retain ownership of the edge
this->add( *pred);
}
} while ( msg == false );
} while ( successful_reserve == false );

return msg;
return successful_reserve;
}
public:
bool try_reserve( output_type& v ) {
return try_reserve_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(nullptr));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool try_reserve( output_type& v, message_metainfo& metainfo ) {
return try_reserve_impl(v, &metainfo);
}
#endif

bool try_release() {
reserved_src.load(std::memory_order_relaxed)->try_release();
reserved_src.store(nullptr, std::memory_order_relaxed);
Expand Down Expand Up @@ -268,6 +306,9 @@ class successor_cache : no_copy {
}

virtual graph_task* try_put_task( const T& t ) = 0;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
virtual graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) = 0;
#endif
}; // successor_cache<T>

//! An abstract cache of successors, specialized to continue_msg
Expand Down Expand Up @@ -327,6 +368,9 @@ class successor_cache< continue_msg, M > : no_copy {
}

virtual graph_task* try_put_task( const continue_msg& t ) = 0;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
virtual graph_task* try_put_task( const continue_msg& t, const message_metainfo& metainfo ) = 0;
#endif
}; // successor_cache< continue_msg >

//! A cache of successors that are broadcast to
Expand All @@ -336,19 +380,12 @@ class broadcast_cache : public successor_cache<T, M> {
typedef M mutex_type;
typedef typename successor_cache<T,M>::successors_type successors_type;

public:

broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

// as above, but call try_put_task instead, and return the last task we received (if any)
graph_task* try_put_task( const T &t ) override {
graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
graph_task * last_task = nullptr;
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
while ( i != this->my_successors.end() ) {
graph_task *new_task = (*i)->try_put_task(t);
graph_task *new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
// workaround for icc bug
graph& graph_ref = (*i)->graph_reference();
last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary
Expand All @@ -365,14 +402,31 @@ class broadcast_cache : public successor_cache<T, M> {
}
return last_task;
}
public:

broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

graph_task* try_put_task( const T &t ) override {
return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* try_put_task( const T &t, const message_metainfo& metainfo ) override {
return try_put_task_impl(t, metainfo);
}
#endif

// call try_put_task and return list of received tasks
bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) {
bool gather_successful_try_puts( const T &t, graph_task_list& tasks
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) )
{
bool is_at_least_one_put_successful = false;
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
while ( i != this->my_successors.end() ) {
graph_task * new_task = (*i)->try_put_task(t);
graph_task * new_task = (*i)->try_put_task(t, metainfo);
if(new_task) {
++i;
if(new_task != SUCCESSFULLY_ENQUEUED) {
Expand Down Expand Up @@ -411,11 +465,15 @@ class round_robin_cache : public successor_cache<T, M> {
return this->my_successors.size();
}

graph_task* try_put_task( const T &t ) override {
private:

graph_task* try_put_task_impl( const T &t
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) )
{
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
while ( i != this->my_successors.end() ) {
graph_task* new_task = (*i)->try_put_task(t);
graph_task* new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
if ( new_task ) {
return new_task;
} else {
Expand All @@ -429,6 +487,17 @@ class round_robin_cache : public successor_cache<T, M> {
}
return nullptr;
}

public:
graph_task* try_put_task(const T& t) override {
return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) override {
return try_put_task_impl(t, metainfo);
}
#endif
};

#endif // __TBB__flow_graph_cache_impl_H
Loading
Loading