Skip to content

Commit

Permalink
TEST: mpi test fixes (#388)
Browse files Browse the repository at this point in the history
  • Loading branch information
valentin petrov committed Jan 17, 2022
1 parent a64de0f commit 1d808c9
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 187 deletions.
41 changes: 19 additions & 22 deletions test/mpi/test_allgather.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
TestAllgather::TestAllgather(size_t _msgsize, ucc_test_mpi_inplace_t _inplace,
ucc_memory_type_t _mt, ucc_test_team_t &_team,
size_t _max_size) :
TestCase(_team, _mt, _msgsize, _inplace, _max_size)
TestCase(_team, UCC_COLL_TYPE_ALLGATHER, _mt, _msgsize, _inplace, _max_size)
{
size_t dt_size = ucc_dt_size(TEST_DT);
size_t dt_size = ucc_dt_size(TEST_DT);
size_t single_rank_count = _msgsize / dt_size;
int rank, size;
int rank, size;

MPI_Comm_rank(team.comm, &rank);
MPI_Comm_size(team.comm, &size);

Expand All @@ -26,21 +27,17 @@ TestAllgather::TestAllgather(size_t _msgsize, ucc_test_mpi_inplace_t _inplace,
}

UCC_CHECK(ucc_mc_alloc(&rbuf_mc_header, _msgsize * size, _mt));
rbuf = rbuf_mc_header->addr;
check_rbuf = ucc_malloc(_msgsize*size, "check rbuf");
UCC_MALLOC_CHECK(check_rbuf);
rbuf = rbuf_mc_header->addr;
check_buf = ucc_malloc(_msgsize*size, "check buf");
UCC_MALLOC_CHECK(check_buf);
if (TEST_NO_INPLACE == inplace) {
UCC_CHECK(ucc_mc_alloc(&sbuf_mc_header, _msgsize, _mt));
UCC_CHECK(ucc_mc_alloc(&check_sbuf_mc_header, _msgsize,
UCC_MEMORY_TYPE_HOST));
sbuf = sbuf_mc_header->addr;
check_sbuf = check_sbuf_mc_header->addr;
} else {
args.mask = UCC_COLL_ARGS_FIELD_FLAGS;
args.flags = UCC_COLL_ARGS_FLAG_IN_PLACE;
}

args.coll_type = UCC_COLL_TYPE_ALLGATHER;
if (TEST_NO_INPLACE == inplace) {
args.src.info.buffer = sbuf;
args.src.info.count = single_rank_count;
Expand All @@ -57,22 +54,22 @@ TestAllgather::TestAllgather(size_t _msgsize, ucc_test_mpi_inplace_t _inplace,

ucc_status_t TestAllgather::set_input()
{
size_t dt_size = ucc_dt_size(TEST_DT);
size_t dt_size = ucc_dt_size(TEST_DT);
size_t single_rank_count = msgsize / dt_size;
size_t single_rank_size = single_rank_count * dt_size;
int rank;
void *buf, *check_buf;
size_t single_rank_size = single_rank_count * dt_size;
int rank;
void *buf, *check;

MPI_Comm_rank(team.comm, &rank);
if (inplace == TEST_NO_INPLACE) {
buf = sbuf;
check_buf = check_sbuf;
buf = sbuf;
} else {
buf = PTR_OFFSET(rbuf, rank * single_rank_size);
check_buf = PTR_OFFSET(check_rbuf, rank * single_rank_size);
buf = PTR_OFFSET(rbuf, rank * single_rank_size);
}
check = PTR_OFFSET(check_buf, rank * single_rank_size);

init_buffer(buf, single_rank_count, TEST_DT, mem_type, rank);
UCC_CHECK(ucc_mc_memcpy(check_buf, buf, single_rank_size,
UCC_CHECK(ucc_mc_memcpy(check, buf, single_rank_size,
UCC_MEMORY_TYPE_HOST, mem_type));
return UCC_OK;
}
Expand All @@ -90,13 +87,13 @@ ucc_status_t TestAllgather::check()
MPI_Datatype dt = ucc_dt_to_mpi(TEST_DT);
MPI_Request req;

MPI_Iallgather(inplace ? MPI_IN_PLACE : check_sbuf, single_rank_count, dt,
check_rbuf, single_rank_count, dt, team.comm, &req);
MPI_Iallgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
check_buf, single_rank_count, dt, team.comm, &req);
do {
MPI_Test(&req, &completed, MPI_STATUS_IGNORE);
ucc_context_progress(team.ctx);
} while(!completed);

return compare_buffers(rbuf, check_rbuf, single_rank_count * size, TEST_DT,
return compare_buffers(rbuf, check_buf, single_rank_count * size, TEST_DT,
mem_type);
}
43 changes: 19 additions & 24 deletions test/mpi/test_allgatherv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ static void fill_counts_and_displacements(int size, int count,
TestAllgatherv::TestAllgatherv(size_t _msgsize, ucc_test_mpi_inplace_t _inplace,
ucc_memory_type_t _mt, ucc_test_team_t &_team,
size_t _max_size) :
TestCase(_team, _mt, _msgsize, _inplace, _max_size)
TestCase(_team, UCC_COLL_TYPE_ALLGATHERV, _mt, _msgsize, _inplace,
_max_size)
{
size_t dt_size = ucc_dt_size(TEST_DT);
size_t count = _msgsize / dt_size;
int rank, size;
counts = NULL;
size_t count = _msgsize / dt_size;
int rank, size;

counts = NULL;
displacements = NULL;
MPI_Comm_rank(team.comm, &rank);
MPI_Comm_size(team.comm, &size);
Expand All @@ -56,24 +58,20 @@ TestAllgatherv::TestAllgatherv(size_t _msgsize, ucc_test_mpi_inplace_t _inplace,
displacements = (int *) ucc_malloc(size * sizeof(uint32_t), "displacements buf");
UCC_MALLOC_CHECK(displacements);
UCC_CHECK(ucc_mc_alloc(&rbuf_mc_header, _msgsize * size, _mt));
rbuf = rbuf_mc_header->addr;
check_rbuf = ucc_malloc(_msgsize*size, "check rbuf");
UCC_MALLOC_CHECK(check_rbuf);
rbuf = rbuf_mc_header->addr;
check_buf = ucc_malloc(_msgsize*size, "check buf");
UCC_MALLOC_CHECK(check_buf);
fill_counts_and_displacements(size, count, counts, displacements);

if (TEST_NO_INPLACE == inplace) {
args.mask = 0;
UCC_CHECK(ucc_mc_alloc(&sbuf_mc_header, counts[rank] * dt_size, _mt));
UCC_CHECK(ucc_mc_alloc(&check_sbuf_mc_header, counts[rank] * dt_size,
UCC_MEMORY_TYPE_HOST));
sbuf = sbuf_mc_header->addr;
check_sbuf = check_sbuf_mc_header->addr;
} else {
args.mask = UCC_COLL_ARGS_FIELD_FLAGS;
args.flags = UCC_COLL_ARGS_FLAG_IN_PLACE;
}

args.coll_type = UCC_COLL_TYPE_ALLGATHERV;
if (TEST_NO_INPLACE == inplace) {
args.src.info.buffer = sbuf;
args.src.info.datatype = TEST_DT;
Expand All @@ -92,19 +90,18 @@ TestAllgatherv::TestAllgatherv(size_t _msgsize, ucc_test_mpi_inplace_t _inplace,
ucc_status_t TestAllgatherv::set_input()
{
size_t dt_size = ucc_dt_size(TEST_DT);
int rank;
void *buf, *check_buf;
int rank;
void *buf, *check;

MPI_Comm_rank(team.comm, &rank);
if (inplace == TEST_NO_INPLACE) {
buf = sbuf;
check_buf = check_sbuf;
buf = sbuf;
} else {
buf = PTR_OFFSET(rbuf, displacements[rank] * dt_size);
check_buf = PTR_OFFSET(check_rbuf, displacements[rank] * dt_size);
buf = PTR_OFFSET(rbuf, displacements[rank] * dt_size);
}
check = PTR_OFFSET(check_buf, displacements[rank] * dt_size);
init_buffer(buf, counts[rank], TEST_DT, mem_type, rank);
UCC_CHECK(ucc_mc_memcpy(check_buf, buf, counts[rank] * dt_size,
UCC_CHECK(ucc_mc_memcpy(check, buf, counts[rank] * dt_size,
UCC_MEMORY_TYPE_HOST, mem_type));
return UCC_OK;
}
Expand All @@ -127,22 +124,20 @@ ucc_status_t TestAllgatherv::check()
{
MPI_Datatype dt = ucc_dt_to_mpi(TEST_DT);
int total_count = 0;
int size, rank, completed, count, i;
int size, rank, completed, i;
MPI_Request req;

MPI_Comm_size(team.comm, &size);
MPI_Comm_rank(team.comm, &rank);
count = counts[rank];
for (i = 0 ; i < size; i++) {
total_count += counts[i];
}
MPI_Iallgatherv((inplace == TEST_INPLACE) ? MPI_IN_PLACE : check_sbuf,
count, dt, check_rbuf, (int*)counts, (int*)displacements,
dt, team.comm, &req);
MPI_Iallgatherv(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, check_buf, (int*)counts,
(int*)displacements, dt, team.comm, &req);
do {
MPI_Test(&req, &completed, MPI_STATUS_IGNORE);
ucc_context_progress(team.ctx);
} while(!completed);

return compare_buffers(rbuf, check_rbuf, total_count, TEST_DT, mem_type);
return compare_buffers(rbuf, check_buf, total_count, TEST_DT, mem_type);
}
38 changes: 15 additions & 23 deletions test/mpi/test_allreduce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,28 @@ TestAllreduce::TestAllreduce(size_t _msgsize, ucc_test_mpi_inplace_t _inplace,
ucc_datatype_t _dt, ucc_reduction_op_t _op,
ucc_memory_type_t _mt, ucc_test_team_t &_team,
size_t _max_size) :
TestCase(_team, _mt, _msgsize, _inplace, _max_size)
TestCase(_team, UCC_COLL_TYPE_ALLREDUCE, _mt, _msgsize, _inplace, _max_size)
{
size_t dt_size = ucc_dt_size(_dt);
size_t count = _msgsize/dt_size;
int rank;
size_t count = _msgsize/dt_size;
int rank;

MPI_Comm_rank(team.comm, &rank);
op = _op;
dt = _dt;
args.coll_type = UCC_COLL_TYPE_ALLREDUCE;

if (skip_reduce(test_max_size < _msgsize, TEST_SKIP_MEM_LIMIT,
team.comm)) {
return;
}

UCC_CHECK(ucc_mc_alloc(&rbuf_mc_header, _msgsize, _mt));
rbuf = rbuf_mc_header->addr;
check_rbuf = ucc_malloc(_msgsize, "check rbuf");
UCC_MALLOC_CHECK(check_rbuf);
rbuf = rbuf_mc_header->addr;
check_buf = ucc_malloc(_msgsize, "check buf");
UCC_MALLOC_CHECK(check_buf);
if (TEST_NO_INPLACE == inplace) {
UCC_CHECK(ucc_mc_alloc(&sbuf_mc_header, _msgsize, _mt));
UCC_CHECK(ucc_mc_alloc(&check_sbuf_mc_header, _msgsize,
UCC_MEMORY_TYPE_HOST));
sbuf = sbuf_mc_header->addr;
check_sbuf = check_sbuf_mc_header->addr;

sbuf = sbuf_mc_header->addr;
args.src.info.buffer = sbuf;
args.src.info.count = count;
args.src.info.datatype = _dt;
Expand All @@ -64,16 +59,14 @@ ucc_status_t TestAllreduce::set_input()
{
size_t dt_size = ucc_dt_size(dt);
size_t count = msgsize / dt_size;
int rank;
void *buf, *check_buf;
int rank;
void *buf;

MPI_Comm_rank(team.comm, &rank);
if (TEST_NO_INPLACE == inplace) {
buf = sbuf;
check_buf = check_sbuf;
} else {
buf = rbuf;
check_buf = check_rbuf;
}
init_buffer(buf, count, dt, mem_type, rank);
UCC_CHECK(ucc_mc_memcpy(check_buf, buf, count * dt_size,
Expand All @@ -88,13 +81,12 @@ ucc_status_t TestAllreduce::reset_sbuf()

ucc_status_t TestAllreduce::check()
{
size_t count = args.dst.info.count;
MPI_Request req;
int completed;
size_t count = args.dst.info.count;
MPI_Request req;
int completed;
ucc_status_t status;

MPI_Iallreduce(inplace ? MPI_IN_PLACE : check_sbuf, check_rbuf, count,
ucc_dt_to_mpi(dt),
MPI_Iallreduce(MPI_IN_PLACE, check_buf, count, ucc_dt_to_mpi(dt),
op == UCC_OP_AVG ? MPI_SUM : ucc_op_to_mpi(op), team.comm,
&req);
do {
Expand All @@ -103,13 +95,13 @@ ucc_status_t TestAllreduce::check()
} while(!completed);

if (op == UCC_OP_AVG) {
status = divide_buffer(check_rbuf, team.team->size, count, dt);
status = divide_buffer(check_buf, team.team->size, count, dt);
if (status != UCC_OK) {
return status;
}
}

return compare_buffers(rbuf, check_rbuf, count, dt, mem_type);
return compare_buffers(rbuf, check_buf, count, dt, mem_type);
}

std::string TestAllreduce::str() {
Expand Down
39 changes: 17 additions & 22 deletions test/mpi/test_alltoall.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
TestAlltoall::TestAlltoall(size_t _msgsize, ucc_test_mpi_inplace_t _inplace,
ucc_memory_type_t _mt, ucc_test_team_t &_team,
size_t _max_size) :
TestCase(_team, _mt, _msgsize, _inplace, _max_size)
TestCase(_team, UCC_COLL_TYPE_ALLTOALL, _mt, _msgsize, _inplace, _max_size)
{
size_t dt_size = ucc_dt_size(TEST_DT);
size_t single_rank_count = _msgsize / dt_size;
Expand All @@ -22,23 +22,18 @@ TestAlltoall::TestAlltoall(size_t _msgsize, ucc_test_mpi_inplace_t _inplace,
MPI_Comm_rank(team.comm, &rank);
MPI_Comm_size(team.comm, &nprocs);

args.coll_type = UCC_COLL_TYPE_ALLTOALL;

if (TEST_SKIP_NONE != skip_reduce(test_max_size < (_msgsize * nprocs),
TEST_SKIP_MEM_LIMIT, team.comm)) {
return;
}

UCC_CHECK(ucc_mc_alloc(&rbuf_mc_header, _msgsize * nprocs, _mt));
rbuf = rbuf_mc_header->addr;
check_rbuf = ucc_malloc(_msgsize * nprocs, "check rbuf");
UCC_MALLOC_CHECK(check_rbuf);
rbuf = rbuf_mc_header->addr;
check_buf = ucc_malloc(_msgsize * nprocs, "check buf");
UCC_MALLOC_CHECK(check_buf);
if (TEST_NO_INPLACE == inplace) {
UCC_CHECK(ucc_mc_alloc(&sbuf_mc_header, _msgsize * nprocs, _mt));
UCC_CHECK(ucc_mc_alloc(&check_sbuf_mc_header, _msgsize * nprocs,
UCC_MEMORY_TYPE_HOST));
sbuf = sbuf_mc_header->addr;
check_sbuf = check_sbuf_mc_header->addr;
} else {
args.mask = UCC_COLL_ARGS_FIELD_FLAGS;
args.flags = UCC_COLL_ARGS_FLAG_IN_PLACE;
Expand All @@ -63,17 +58,15 @@ ucc_status_t TestAlltoall::set_input()
{
size_t dt_size = ucc_dt_size(TEST_DT);
size_t single_rank_count = msgsize / dt_size;
void *buf, *check_buf;
int rank, nprocs;
void *buf;
int rank, nprocs;

MPI_Comm_rank(team.comm, &rank);
MPI_Comm_size(team.comm, &nprocs);
if (TEST_NO_INPLACE == inplace) {
buf = sbuf;
check_buf = check_sbuf;
} else {
buf = rbuf;
check_buf = rbuf;
}
init_buffer(buf, single_rank_count * nprocs, TEST_DT, mem_type, rank);
UCC_CHECK(ucc_mc_memcpy(check_buf, buf, single_rank_count * nprocs * dt_size,
Expand All @@ -83,9 +76,9 @@ ucc_status_t TestAlltoall::set_input()

ucc_status_t TestAlltoall::reset_sbuf()
{
size_t dt_size = ucc_dt_size(TEST_DT);
size_t dt_size = ucc_dt_size(TEST_DT);
size_t single_rank_count = msgsize / dt_size;
int rank, nprocs;
int rank, nprocs;

MPI_Comm_rank(team.comm, &rank);
MPI_Comm_size(team.comm, &nprocs);
Expand All @@ -97,19 +90,21 @@ ucc_status_t TestAlltoall::reset_sbuf()

ucc_status_t TestAlltoall::check()
{
int size, completed;
MPI_Comm_size(team.comm, &size);
size_t single_rank_count = args.src.info.count / size;
int size, completed;
size_t single_rank_count;
MPI_Request req;

MPI_Ialltoall(inplace ? MPI_IN_PLACE : check_sbuf, single_rank_count,
ucc_dt_to_mpi(TEST_DT), check_rbuf, single_rank_count,
ucc_dt_to_mpi(TEST_DT), team.comm, &req);
MPI_Comm_size(team.comm, &size);
single_rank_count = args.src.info.count / size;

MPI_Ialltoall(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
check_buf, single_rank_count, ucc_dt_to_mpi(TEST_DT),
team.comm, &req);
do {
MPI_Test(&req, &completed, MPI_STATUS_IGNORE);
ucc_context_progress(team.ctx);
} while(!completed);

return compare_buffers(rbuf, check_rbuf, single_rank_count * size, TEST_DT,
return compare_buffers(rbuf, check_buf, single_rank_count * size, TEST_DT,
mem_type);
}
Loading

0 comments on commit 1d808c9

Please sign in to comment.