Skip to content

Commit

Permalink
src: Continue debugging efforts for multirail multiplexing
Browse files Browse the repository at this point in the history
  • Loading branch information
philipmarshall21 committed Apr 30, 2024
1 parent 8c876a6 commit c9d74f6
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 35 deletions.
105 changes: 78 additions & 27 deletions src/transport_ofi.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
#include "runtime.h"
#include "uthash.h"


struct fi_info **prov_list = NULL;
struct shmem_transport_ofi_ep **shmem_transport_ofi_eps = NULL;
size_t shmem_transport_ofi_num_eps = 0;

Expand Down Expand Up @@ -661,23 +661,37 @@ int ofi_mr_reg_external_heap(void)
.offset = 0,
.context = NULL
};

ret = fi_mr_regattr(shmem_transport_ofi_domainfd, &mr_attr, 0, &shmem_transport_ofi_external_heap_mrfd);
OFI_CHECK_RETURN_STR(ret, "fi_mr_regattr (heap) failed");
for (size_t idx = 0; idx < shmem_transport_ofi_num_eps; idx++) {
ret = fi_mr_regattr(/*shmem_transport_ofi_domainfd*/ shmem_transport_ofi_eps[idx]->domain, &mr_attr, 0, &shmem_transport_ofi_external_heap_mrfd);
OFI_CHECK_RETURN_STR(ret, "fi_mr_regattr (heap) failed");
}

#if ENABLE_TARGET_CNTR
ret = fi_mr_bind(shmem_transport_ofi_external_heap_mrfd,
&shmem_transport_ofi_target_cntrfd->fid,
FI_REMOTE_WRITE);
OFI_CHECK_RETURN_STR(ret, "target CNTR binding to external heap MR failed");
for (size_t idx = 0; idx < shmem_transport_ofi_num_eps; idx++) {
ret = fi_mr_bind(shmem_transport_ofi_external_heap_mrfd,
/*&shmem_transport_ofi_target_cntrfd->fid*/ &shmem_transport_ofi_eps[idx]->cntr->fid,
FI_REMOTE_WRITE);
OFI_CHECK_RETURN_STR(ret, "target CNTR binding to external heap MR failed");
}

if (shmem_transport_ofi_info.p_info->domain_attr->mr_mode & FI_MR_ENDPOINT) {
for (size_t idx = 0; idx < shmem_transport_ofi_num_eps; idx++) {
ret = fi_ep_bind(shmem_transport_ofi_eps[idx]->ep,
&shmem_transport_ofi_eps[idx]->cntr->fid, FI_REMOTE_WRITE);
OFI_CHECK_RETURN_STR(ret, "target CNTR binding to target EP failed");
ret = fi_mr_bind(shmem_transport_ofi_external_heap_mrfd,
&shmem_transport_ofi_eps[idx]->ep->fid, FI_REMOTE_WRITE);
OFI_CHECK_RETURN_STR(ret, "target EP binding to heap MR failed");
}

/*
ret = fi_ep_bind(shmem_transport_ofi_target_ep,
&shmem_transport_ofi_target_cntrfd->fid, FI_REMOTE_WRITE);
OFI_CHECK_RETURN_STR(ret, "target CNTR binding to target EP failed");
ret = fi_mr_bind(shmem_transport_ofi_external_heap_mrfd,
&shmem_transport_ofi_target_ep->fid, FI_REMOTE_WRITE);
OFI_CHECK_RETURN_STR(ret, "target EP binding to heap MR failed");
*/

ret = fi_mr_enable(shmem_transport_ofi_external_heap_mrfd);
OFI_CHECK_RETURN_STR(ret, "target heap MR enable failed");
Expand All @@ -702,7 +716,7 @@ int ofi_mr_reg_bind(uint64_t flags, size_t idx)
/* Bind counter with target memory region for incoming messages */
#if ENABLE_TARGET_CNTR
ret = fi_mr_bind(shmem_transport_ofi_target_mrfd,
&shmem_transport_ofi_target_cntrfd->fid,
/*&shmem_transport_ofi_target_cntrfd->fid*/ &shmem_transport_ofi_eps[idx]->cntr->fid,
FI_REMOTE_WRITE);
OFI_CHECK_RETURN_STR(ret, "target CNTR binding to MR failed");

Expand Down Expand Up @@ -735,19 +749,19 @@ int ofi_mr_reg_bind(uint64_t flags, size_t idx)
/* Bind counter with target memory region for incoming messages */
#if ENABLE_TARGET_CNTR
ret = fi_mr_bind(shmem_transport_ofi_target_heap_mrfd,
&shmem_transport_ofi_target_cntrfd->fid,
/*&shmem_transport_ofi_target_cntrfd->fid*/ &shmem_transport_ofi_eps[idx]->cntr->fid,
FI_REMOTE_WRITE);
OFI_CHECK_RETURN_STR(ret, "target CNTR binding to heap MR failed");

ret = fi_mr_bind(shmem_transport_ofi_target_data_mrfd,
&shmem_transport_ofi_target_cntrfd->fid,
/*&shmem_transport_ofi_target_cntrfd->fid*/ &shmem_transport_ofi_eps[idx]->cntr->fid,
FI_REMOTE_WRITE);
OFI_CHECK_RETURN_STR(ret, "target CNTR binding to data MR failed");

#ifdef ENABLE_MR_ENDPOINT
if (shmem_transport_ofi_info.p_info->domain_attr->mr_mode & FI_MR_ENDPOINT) {
ret = fi_ep_bind(/*shmem_transport_ofi_target_ep*/ shmem_transport_ofi_eps[idx]->ep,
&shmem_transport_ofi_target_cntrfd->fid, FI_REMOTE_WRITE);
/*&shmem_transport_ofi_target_cntrfd->fid*/ &shmem_transport_ofi_eps[idx]->cntr->fid, FI_REMOTE_WRITE);
OFI_CHECK_RETURN_STR(ret, "target CNTR binding to target EP failed");

ret = fi_mr_bind(shmem_transport_ofi_target_heap_mrfd,
Expand Down Expand Up @@ -804,7 +818,7 @@ int allocate_recv_cntr_mr(size_t idx)
cntr_attr.wait_obj = FI_WAIT_UNSPEC;

ret = fi_cntr_open(/*shmem_transport_ofi_domainfd*/ shmem_transport_ofi_eps[idx]->domain, &cntr_attr,
&shmem_transport_ofi_target_cntrfd /*FIX*/, NULL);
/*&shmem_transport_ofi_target_cntrfd*/ &shmem_transport_ofi_eps[idx]->cntr /*FIX - May be fixed now*/, NULL);
OFI_CHECK_RETURN_STR(ret, "target CNTR open failed");

#ifdef ENABLE_MR_RMA_EVENT
Expand Down Expand Up @@ -1310,8 +1324,9 @@ int allocate_fabric_resources(struct fabric_info *info)
}

/* fabric domain: define domain of resources physical and logical */
struct fi_info *prov = info->p_info;
for (size_t idx = 0; idx < shmem_transport_ofi_num_eps; idx++) {
struct fi_info *prov = prov_list[idx];

shmem_transport_ofi_eps[idx]->info = prov;
ret = fi_fabric(prov->fabric_attr, /*&shmem_transport_ofi_fabfd*/ &shmem_transport_ofi_eps[idx]->fabric, NULL);
OFI_CHECK_RETURN_STR(ret, "fabric initialization failed");
Expand Down Expand Up @@ -1352,8 +1367,6 @@ int allocate_fabric_resources(struct fabric_info *info)
/*&shmem_transport_ofi_avfd*/ &shmem_transport_ofi_eps[idx]->av,
NULL);
OFI_CHECK_RETURN_STR(ret, "AV creation failed");

prov = prov->next;
}
return ret;
}
Expand All @@ -1366,6 +1379,11 @@ struct fi_info *assign_nic_with_hwloc(struct fi_info *fabric, struct fi_info **p
ret = hwloc_get_proc_last_cpu_location(shmem_internal_topology, getpid(), bindset, HWLOC_CPUBIND_PROCESS);
if (ret < 0) {
RAISE_WARN_MSG("hwloc_get_proc_last_cpu_location failed (%s)\n", strerror(errno));
prov_list = (struct fi_info **) malloc(num_nics * sizeof(struct fi_info *));
for (size_t idx = 0; idx < num_nics; idx++) {
prov_list[idx] = provs[idx];
}
shmem_transport_ofi_num_eps = num_nics;
return provs[shmem_internal_my_pe % num_nics];
}

Expand All @@ -1381,11 +1399,21 @@ struct fi_info *assign_nic_with_hwloc(struct fi_info *fabric, struct fi_info **p
hwloc_obj_t io_device = hwloc_get_pcidev_by_busid(shmem_internal_topology, pci.domain_id, pci.bus_id, pci.device_id, pci.function_id);
if (!io_device) {
RAISE_WARN_MSG("hwloc_get_pcidev_by_busid failed\n");
prov_list = (struct fi_info **) malloc(num_nics * sizeof(struct fi_info *));
for (size_t idx = 0; idx < num_nics; idx++) {
prov_list[idx] = provs[idx];
}
shmem_transport_ofi_num_eps = num_nics;
return provs[shmem_internal_my_pe % num_nics];
};
hwloc_obj_t first_non_io = hwloc_get_non_io_ancestor_obj(shmem_internal_topology, io_device);
if (!first_non_io) {
RAISE_WARN_MSG("hwloc_get_non_io_ancestor_obj failed\n");
prov_list = (struct fi_info **) malloc(num_nics * sizeof(struct fi_info *));
for (size_t idx = 0; idx < num_nics; idx++) {
prov_list[idx] = provs[idx];
}
shmem_transport_ofi_num_eps = num_nics;
return provs[shmem_internal_my_pe % num_nics];
}

Expand All @@ -1402,7 +1430,10 @@ struct fi_info *assign_nic_with_hwloc(struct fi_info *fabric, struct fi_info **p

if (!close_provs) {
RAISE_WARN_MSG("Could not detect any NICs with affinity to the process\n");

prov_list = (struct fi_info **) malloc(num_nics * sizeof(struct fi_info *));
for (size_t idx = 0; idx < num_nics; idx++) {
prov_list[idx] = provs[idx];
}
/* If no 'close' NICs, select from list of all NICs using round-robin assignment */
shmem_transport_ofi_num_eps = num_nics;
return provs[shmem_internal_my_pe % num_nics];
Expand All @@ -1411,18 +1442,25 @@ struct fi_info *assign_nic_with_hwloc(struct fi_info *fabric, struct fi_info **p
last_added->next = NULL;

int idx = 0;
struct fi_info **prov_list = (struct fi_info **) malloc(num_close_nics * sizeof(struct fi_info *));
/*struct fi_info **prov_list = (struct fi_info **) malloc(num_close_nics * sizeof(struct fi_info *));
for (struct fi_info *cur_fabric = close_provs; cur_fabric; cur_fabric = cur_fabric->next) {
prov_list[idx++] = cur_fabric;
}*/
prov_list = (struct fi_info **) malloc(num_close_nics * sizeof(struct fi_info *));
for (struct fi_info *cur_fabric = close_provs; cur_fabric; cur_fabric = cur_fabric->next) {
prov_list[idx++] = cur_fabric;
}


hwloc_bitmap_free(bindset);

struct fi_info *provider = prov_list[shmem_internal_my_pe % num_close_nics];
free(prov_list);
//free(prov_list);

shmem_transport_ofi_num_eps = num_close_nics;
return provider;


}
#endif

Expand Down Expand Up @@ -1576,6 +1614,8 @@ int query_for_fabric(struct fabric_info *info)
info->p_info = NULL;

if (shmem_internal_params.OFI_DISABLE_MULTIRAIL) {
prov_list = (struct fi_info **) malloc(sizeof(struct fi_info *));
prov_list[0] = fabrics_list_head;
info->p_info = fabrics_list_head;
shmem_transport_ofi_num_eps = 1;
}
Expand All @@ -1593,28 +1633,35 @@ int query_for_fabric(struct fabric_info *info)
if (multirail_fabric_list_tail) multirail_fabric_list_tail->next = NULL;

if (num_nics == 0) {
prov_list = (struct fi_info **) malloc(sizeof(struct fi_info *));
prov_list[0] = fallback;
info->p_info = fallback;
shmem_transport_ofi_num_eps = 1;
}
else {
int idx = 0;
struct fi_info **prov_list = (struct fi_info **) malloc(num_nics * sizeof(struct fi_info *));
struct fi_info **sorted_prov_list = (struct fi_info **) malloc(num_nics * sizeof(struct fi_info *));
for (struct fi_info *cur_fabric = multirail_fabric_list_head; cur_fabric; cur_fabric = cur_fabric->next) {
prov_list[idx++] = cur_fabric;
sorted_prov_list[idx++] = cur_fabric;
}
qsort(prov_list, num_nics, sizeof(struct fi_info *), compare_nic_names);
qsort(sorted_prov_list, num_nics, sizeof(struct fi_info *), compare_nic_names);
#ifdef USE_HWLOC
info->p_info = assign_nic_with_hwloc(info->p_info, prov_list, num_nics);
info->p_info = assign_nic_with_hwloc(info->p_info, sorted_prov_list, num_nics);
#else
/* Round-robin assignment of NICs to PEs
* FIXME: A more suitable indexing value would be
* shmem_team_my_pe(SHMEM_TEAM_NODE) % num_nics, but it is too early in initialization to
* do that here. We would also want to replace the similar occurrences in the
* assign_nic_with_hwloc function. */
prov_list = (struct fi_info **) malloc(num_nics * sizeof(struct fi_info *));
for (size_t idx = 0; idx < num_nics; idx++) {
prov_list[idx] = sorted_prov_list[idx];
}

info->p_info = prov_list[shmem_internal_my_pe % num_nics];
shmem_transport_ofi_num_eps = num_nics;
#endif
free(prov_list);
//free(prov_list); /* FIX - Add free of prov_list in cleanup */
}
}
if (NULL == info->p_info) {
Expand Down Expand Up @@ -1957,6 +2004,10 @@ int shmem_transport_startup(void)

shmem_transport_ofi_stx_pool = (shmem_transport_ofi_stx_t **) malloc(shmem_transport_ofi_num_eps *
sizeof(shmem_transport_ofi_stx_t *));
for (size_t idx = 0; idx < shmem_transport_ofi_num_eps; idx++) {
shmem_transport_ofi_stx_pool[idx] = NULL;
}

for (size_t idx = 0; idx < shmem_transport_ofi_num_eps; idx++) {
if (shmem_internal_params.OFI_STX_AUTO && shmem_transport_ofi_stx_max == 0) {
RAISE_WARN_STR("STXs disabled, ignoring request for automatic STX management");
Expand Down Expand Up @@ -2254,13 +2305,13 @@ int shmem_transport_fini(void)
#if defined(ENABLE_MR_SCALABLE)
#if defined(ENABLE_REMOTE_VIRTUAL_ADDRESSING)
ret = fi_close(&shmem_transport_ofi_target_mrfd->fid); // Fix?
OFI_CHECK_ERROR_MSG(ret, "Target MR close failed (%s)\n", fi_strerror(errno));
OFI_CHECK_ERROR_MSG(ret, "Target MR close failed (%s)\n", fi_strerror(ret));
#else
ret = fi_close(&shmem_transport_ofi_target_heap_mrfd->fid); // Fix?
OFI_CHECK_ERROR_MSG(ret, "Target heap MR close failed (%s)\n", fi_strerror(errno));
OFI_CHECK_ERROR_MSG(ret, "Target heap MR close failed (%s)\n", fi_strerror(ret));

ret = fi_close(&shmem_transport_ofi_target_data_mrfd->fid); // Fix?
OFI_CHECK_ERROR_MSG(ret, "Target data MR close failed (%s)\n", fi_strerror(errno));
OFI_CHECK_ERROR_MSG(ret, "Target data MR close failed (%s)\n", fi_strerror(ret));
#endif
#else
free(shmem_transport_ofi_target_heap_keys);
Expand Down
16 changes: 8 additions & 8 deletions src/transport_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -586,11 +586,11 @@ void shmem_transport_put_scalar(shmem_transport_ctx_t* ctx, void *target, const
shmem_internal_assert(len <= shmem_transport_ofi_max_buffered_send);

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[0] /* FIX */);

do {

ret = fi_inject_write(ctx->ep,
ret = fi_inject_write(ctx->ep[0] /* FIX */,
source,
len,
GET_DEST(dst),
Expand Down Expand Up @@ -625,10 +625,10 @@ void shmem_transport_ofi_put_large(shmem_transport_ctx_t* ctx, void *target, con
(size_t) (((uint8_t *) source) + len - frag_source));
polled = 0;

SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[0]) /*FIX*/;

do {
ret = fi_write(ctx->ep,
ret = fi_write(ctx->ep[0] /* FIX */,
frag_source, frag_len, NULL,
GET_DEST(dst), frag_target,
key, NULL);
Expand Down Expand Up @@ -1109,16 +1109,16 @@ void shmem_transport_atomic(shmem_transport_ctx_t* ctx, void *target, const void
uint64_t polled = 0;
uint64_t key;
uint8_t *addr;

;
shmem_transport_ofi_get_mr(target, pe, &addr, &key);

;
shmem_internal_assert(SHMEM_Dtsize[SHMEM_TRANSPORT_DTYPE(datatype)] == len);

SHMEM_TRANSPORT_OFI_CTX_LOCK(ctx);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr);
SHMEM_TRANSPORT_OFI_CNTR_INC(&ctx->pending_put_cntr[0] /* FIX */);

do {
ret = fi_inject_atomic(ctx->ep,
ret = fi_inject_atomic(ctx->ep[0] /* FIX */,
source,
1,
GET_DEST(dst),
Expand Down

0 comments on commit c9d74f6

Please sign in to comment.