Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-15829 object: fix potential DRAM leak when retry after DTX refre… #14432

Merged
merged 2 commits into from
May 25, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 98 additions & 99 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -705,20 +705,21 @@
/* Re-entry case. */
if (orwo->orw_nrs.ca_count != 0) {
D_ASSERT(orwo->orw_nrs.ca_count == nrs_count);
return 0;
}

/* return sg_nr_out and data size for sgl */
D_ALLOC(orwo->orw_nrs.ca_arrays,
nrs_count * (sizeof(uint32_t) + sizeof(daos_size_t)));
if (orwo->orw_nrs.ca_arrays == NULL)
return -DER_NOMEM;
D_ASSERT(orwo->orw_data_sizes.ca_count == nrs_count);
D_ASSERT(orwo->orw_nrs.ca_arrays != NULL);
D_ASSERT(orwo->orw_data_sizes.ca_arrays != NULL);
} else {
/* return sg_nr_out and data size for sgl */
D_ALLOC(orwo->orw_nrs.ca_arrays,
nrs_count * (sizeof(uint32_t) + sizeof(daos_size_t)));
if (orwo->orw_nrs.ca_arrays == NULL)
return -DER_NOMEM;

orwo->orw_nrs.ca_count = nrs_count;
orwo->orw_data_sizes.ca_count = nrs_count;
orwo->orw_data_sizes.ca_arrays =
(void *)((char *)orwo->orw_nrs.ca_arrays +
nrs_count * (sizeof(uint32_t)));
orwo->orw_nrs.ca_count = nrs_count;
orwo->orw_data_sizes.ca_count = nrs_count;
orwo->orw_data_sizes.ca_arrays = (void *)((char *)orwo->orw_nrs.ca_arrays +
nrs_count * (sizeof(uint32_t)));
}

nrs = orwo->orw_nrs.ca_arrays;
data_sizes = orwo->orw_data_sizes.ca_arrays;
Expand Down Expand Up @@ -858,13 +859,20 @@
*
* The memory will be freed in obj_rw_reply
*/
rc = daos_csummer_alloc_iods_csums(cont->sc_csummer, orw->orw_iod_array.oia_iods,
orw->orw_iod_array.oia_iod_nr, false, NULL,
&orwo->orw_iod_csums.ca_arrays);

if (rc >= 0) {
orwo->orw_iod_csums.ca_count = (uint64_t)rc;
rc = 0;
/* Re-entry case. */
if (orwo->orw_iod_csums.ca_count != 0) {
D_ASSERT(orwo->orw_iod_csums.ca_arrays != NULL);
rc = 0;
} else {
rc = daos_csummer_alloc_iods_csums(cont->sc_csummer, orw->orw_iod_array.oia_iods,
orw->orw_iod_array.oia_iod_nr, false, NULL,
&orwo->orw_iod_csums.ca_arrays);

if (rc >= 0) {
orwo->orw_iod_csums.ca_count = rc;
rc = 0;
}
}

return rc;
Expand Down Expand Up @@ -1115,10 +1123,10 @@
if (skips == NULL)
D_ASSERTF(total_nr == iods_nr, "total nr %d, iods_nr %d\n", total_nr, iods_nr);

/* Re-entry case. */
if (orwo->orw_maps.ca_count != 0) {
D_ASSERT(orwo->orw_maps.ca_count == total_nr);
return 0;
/* Re-entry case, iods may be changed, let's re-generate the maps. */
if (orwo->orw_maps.ca_arrays != NULL) {
ds_iom_free(&orwo->orw_maps.ca_arrays, orwo->orw_maps.ca_count);
orwo->orw_maps.ca_count = 0;
}

rc = ds_iom_create(biod, iods, iods_nr, flags, &maps);
Expand Down Expand Up @@ -1198,6 +1206,10 @@
int j;
int rc = 0;

/* Re-entry case. */
if (ioc->ioc_free_sgls)
return 0;

for (i = 0; i < nr; i++) {
for (j = 0; j < sgls[i].sg_nr; j++) {
d_iov_t *iov = &sgls[i].sg_iovs[j];
Expand Down Expand Up @@ -1993,7 +2005,7 @@
}

static int
obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
obj_local_rw_internal_wrap(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
{
struct obj_rw_in *orw = crt_req_get(rpc);
daos_iod_t iod = { 0 };
Expand All @@ -2007,32 +2019,13 @@
uint8_t *skips = (uint8_t *)&local_skips;
uint32_t nr = 0;
int rc;
int count = 0;

rc = obj_get_iods_offs(orw->orw_oid, &orw->orw_iod_array, &ioc->ioc_oca,
orw->orw_dkey_hash, ioc->ioc_layout_ver, &iods,
&offs, &skips, &csums, &csum_info, &nr);
if (rc != 0)
D_GOTO(out, rc);
again:
rc = obj_local_rw_internal(rpc, ioc, iods, csums, offs, skips, nr, dth);
if (dth != NULL && obj_dtx_need_refresh(dth, rc)) {
if (unlikely(++count % 10 == 3)) {
struct dtx_share_peer *dsp;

dsp = d_list_entry(dth->dth_share_tbd_list.next, struct dtx_share_peer,
dsp_link);
D_WARN("DTX refresh for "DF_DTI" because of "DF_DTI" (%d) for %d times, "
"maybe dead loop\n", DP_DTI(&dth->dth_xid), DP_DTI(&dsp->dsp_xid),
dth->dth_share_tbd_count, count);
}

rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == -DER_AGAIN)
goto again;
}
if (rc == 0)
rc = obj_local_rw_internal(rpc, ioc, iods, csums, offs, skips, nr, dth);

out:
if (csums != NULL && csums != &csum && csums != orw->orw_iod_array.oia_iod_csums) {
int i;

Expand All @@ -2052,6 +2045,32 @@
return rc;
}

static int
obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
{
struct dtx_share_peer *dsp;
uint32_t retry = 0;
int rc;

again:
rc = obj_local_rw_internal_wrap(rpc, ioc, dth);
if (dth != NULL && obj_dtx_need_refresh(dth, rc)) {
if (unlikely(++retry % 10 == 3)) {
dsp = d_list_entry(dth->dth_share_tbd_list.next, struct dtx_share_peer,
dsp_link);
D_WARN("DTX refresh for "DF_DTI" because of "DF_DTI" (%d) for %d times, "
"maybe dead loop\n", DP_DTI(&dth->dth_xid), DP_DTI(&dsp->dsp_xid),
dth->dth_share_tbd_count, retry);
}

rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == -DER_AGAIN)
goto again;
}

return rc;
}

static int
obj_capa_check(struct ds_cont_hdl *coh, bool is_write, bool is_agg_migrate)
{
Expand Down Expand Up @@ -2213,7 +2232,7 @@
* 2. The current replica was NOT the old leader if
* with the old pool map version. But it becomes
* the new leader with the new pool map version.
* In the subsequent modificaiton, it may hit

Check failure on line 2235 in src/object/srv_obj.c

View workflow job for this annotation

GitHub Actions / Codespell

modificaiton ==> modification
* some 'prepared' DTX when make availability
* check, it will return -DER_INPROGRESS that
* will cause client to retry. It is possible
Expand Down Expand Up @@ -3098,45 +3117,12 @@
D_FREE(oeo->oeo_csum_iov.iov_buf);
}

static int
obj_restore_enum_args(crt_rpc_t *rpc, struct ds_obj_enum_arg *des,
struct ds_obj_enum_arg *src)
{
struct obj_key_enum_out *oeo = crt_reply_get(rpc);
struct obj_key_enum_in *oei = crt_req_get(rpc);
int rc;

if (!des->fill_recxs && des->csum_iov.iov_buf != NULL)
daos_iov_free(&des->csum_iov);

*des = *src;

if (des->fill_recxs)
return 0;

if (des->kds != NULL)
memset(des->kds, 0, des->kds_cap * sizeof(daos_key_desc_t));
des->kds_len = 0;

if (oeo->oeo_sgl.sg_iovs == NULL)
return 0;

d_sgl_fini(&oeo->oeo_sgl, true);
rc = daos_sgls_alloc(&oeo->oeo_sgl, &oei->oei_sgl, 1);
if (rc != 0)
return rc;

des->sgl = &oeo->oeo_sgl;
return 0;
}

static int
obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
struct vos_iter_anchors *anchors, struct ds_obj_enum_arg *enum_arg,
daos_epoch_t *e_out)
{
vos_iter_param_t param = { 0 };
struct ds_obj_enum_arg saved_arg;
struct obj_key_enum_in *oei = crt_req_get(rpc);
struct dtx_handle *dth = NULL;
uint32_t flags = 0;
Expand Down Expand Up @@ -3199,7 +3185,7 @@
D_ASSERT(opc == DAOS_OBJ_RPC_ENUMERATE);
type = VOS_ITER_DKEY;
param.ip_flags |= VOS_IT_RECX_VISIBLE;
if (daos_anchor_get_flags(&anchors[0].ia_dkey) &
if (daos_anchor_get_flags(&anchors->ia_dkey) &
DIOF_WITH_SPEC_EPOCH) {
/* For obj verification case. */
param.ip_epc_expr = VOS_IT_EPC_RR;
Expand Down Expand Up @@ -3229,7 +3215,7 @@
* 'type' to indicate the anchor is on SV tree or EV tree.
*/
if (type == VOS_ITER_SINGLE)
anchors[0].ia_sv = anchors[0].ia_ev;
anchors->ia_sv = anchors->ia_ev;
else if (oei->oei_oid.id_shard % 3 == 1 &&
DAOS_FAIL_CHECK(DAOS_VC_LOST_REPLICA))
D_GOTO(failed, rc = -DER_NONEXIST);
Expand All @@ -3245,9 +3231,6 @@
goto failed;
}

anchors[1] = anchors[0];
saved_arg = *enum_arg;

if (oei->oei_flags & ORF_FOR_MIGRATION)
flags = DTX_FOR_MIGRATION;

Expand All @@ -3258,16 +3241,12 @@
goto failed;

re_pack:
rc = ds_obj_enum_pack(&param, type, recursive, &anchors[0], enum_arg, vos_iterate, dth);
rc = ds_obj_enum_pack(&param, type, recursive, anchors, enum_arg, vos_iterate, dth);
if (obj_dtx_need_refresh(dth, rc)) {
rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == -DER_AGAIN) {
anchors[0] = anchors[1];
obj_restore_enum_args(rpc, enum_arg, &saved_arg);
if (opc == DAOS_OBJ_RPC_ENUMERATE)
fill_oid(oei->oei_oid, enum_arg);
/* After DTX refresh, re_pack will resume from the position at \@anchors. */
if (rc == -DER_AGAIN)
goto re_pack;
}
}

if ((rc == -DER_KEY2BIG) && opc == DAOS_OBJ_RPC_ENUMERATE &&
Expand All @@ -3293,7 +3272,7 @@
rc = rc_tmp;

if (type == VOS_ITER_SINGLE)
anchors[0].ia_ev = anchors[0].ia_sv;
anchors->ia_ev = anchors->ia_sv;

D_DEBUG(DB_IO, ""DF_UOID" iterate "DF_X64"-"DF_X64" type %d tag %d"
" rc %d\n", DP_UOID(oei->oei_oid), param.ip_epr.epr_lo,
Expand Down Expand Up @@ -3390,13 +3369,13 @@
dss_get_module_info()->dmi_xs_id,
oei->oei_map_ver, ioc.ioc_map_ver);

D_ALLOC_ARRAY(anchors, 2);
D_ALLOC_PTR(anchors);
if (anchors == NULL)
D_GOTO(out, rc = -DER_NOMEM);

anchors[0].ia_dkey = oei->oei_dkey_anchor;
anchors[0].ia_akey = oei->oei_akey_anchor;
anchors[0].ia_ev = oei->oei_anchor;
anchors->ia_dkey = oei->oei_dkey_anchor;
anchors->ia_akey = oei->oei_akey_anchor;
anchors->ia_ev = oei->oei_anchor;

/* TODO: Transfer the inline_thres from enumerate RPC */
enum_arg.inline_thres = 32;
Expand Down Expand Up @@ -3447,9 +3426,9 @@
if (rc)
D_GOTO(out, rc);

oeo->oeo_dkey_anchor = anchors[0].ia_dkey;
oeo->oeo_akey_anchor = anchors[0].ia_akey;
oeo->oeo_anchor = anchors[0].ia_ev;
oeo->oeo_dkey_anchor = anchors->ia_dkey;
oeo->oeo_akey_anchor = anchors->ia_akey;
oeo->oeo_anchor = anchors->ia_ev;

if (enum_arg.eprs)
oeo->oeo_eprs.ca_count = enum_arg.eprs_len;
Expand Down Expand Up @@ -3498,7 +3477,9 @@
struct obj_io_context *ioc, struct dtx_handle *dth)
{
struct ds_cont_child *cont = ioc->ioc_coc;
struct dtx_share_peer *dsp;
uint64_t sched_seq;
uint32_t retry = 0;
int rc = 0;

if (daos_is_zero_dti(&opi->opi_dti)) {
Expand Down Expand Up @@ -3544,6 +3525,14 @@
}

if (dth != NULL && obj_dtx_need_refresh(dth, rc)) {
if (unlikely(++retry % 10 == 3)) {
dsp = d_list_entry(dth->dth_share_tbd_list.next,
struct dtx_share_peer, dsp_link);
D_WARN("DTX refresh for "DF_DTI" because of "DF_DTI" (%d) for %d "
"times, maybe dead loop\n", DP_DTI(&dth->dth_xid),
DP_DTI(&dsp->dsp_xid), dth->dth_share_tbd_count, retry);
}

rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc != -DER_AGAIN)
goto out;
Expand Down Expand Up @@ -4679,11 +4668,21 @@
struct daos_cpd_disp_ent *dcde, struct daos_cpd_sub_req *dcsrs,
struct obj_io_context *ioc, struct dtx_handle *dth)
{
int rc;
struct dtx_share_peer *dsp;
uint32_t retry = 0;
int rc;

again:
rc = ds_cpd_handle_one(rpc, dcsh, dcde, dcsrs, ioc, dth);
if (obj_dtx_need_refresh(dth, rc)) {
if (unlikely(++retry % 10 == 3)) {
dsp = d_list_entry(dth->dth_share_tbd_list.next,
struct dtx_share_peer, dsp_link);
D_WARN("DTX refresh for "DF_DTI" because of "DF_DTI" (%d) for %d "
"times, maybe dead loop\n", DP_DTI(&dth->dth_xid),
DP_DTI(&dsp->dsp_xid), dth->dth_share_tbd_count, retry);
}

rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == -DER_AGAIN)
goto again;
Expand Down
Loading