Skip to content

Commit

Permalink
DAOS-16469 dtx: properly handle DTX partial commit
Browse files Browse the repository at this point in the history
When a DTX leader globally commit the DTX, it is possible that some
DTX participant(s) cannot commit such DTX entry because of kinds of
issues, such as network or space trouble. Under such case, the DTX
leader needs to keep the active DTX entry persistently for further
commit/resync. But it does not means related modification attched
to such DTX entry on the leader target cannot be committed, instead,
we can commit related modification with only keeping the DTX header.
That is enough for the DTX leader to do further DTX commit/resync
to handle related former failed DTX participant(s).

The benefit is that VOS aggregation on the leader target will not
be affected by remote DTX commit failure.

Allow-unstable-test: true

Signed-off-by: Fan Yong <[email protected]>
  • Loading branch information
Nasf-Fan committed Nov 9, 2024
1 parent 6a4364e commit b612777
Show file tree
Hide file tree
Showing 19 changed files with 258 additions and 165 deletions.
2 changes: 1 addition & 1 deletion src/dtx/dtx_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ dtx_coll_local_one(void *args)

switch (opc) {
case DTX_COLL_COMMIT:
rc = vos_dtx_commit(cont->sc_hdl, &dcla->dcla_xid, 1, NULL);
rc = vos_dtx_commit(cont->sc_hdl, &dcla->dcla_xid, 1, false, NULL);
break;
case DTX_COLL_ABORT:
rc = vos_dtx_abort(cont->sc_hdl, &dcla->dcla_xid, dcla->dcla_epoch);
Expand Down
2 changes: 1 addition & 1 deletion src/dtx/dtx_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,7 @@ dtx_end(struct dtx_handle *dth, struct ds_cont_child *cont, int result)
* and can be committed next time.
*/
rc = vos_dtx_commit(cont->sc_hdl, dth->dth_dti_cos,
dth->dth_dti_cos_count, NULL);
dth->dth_dti_cos_count, false, NULL);
if (rc < 0)
D_ERROR(DF_UUID": Fail to DTX CoS commit: %d\n",
DP_UUID(cont->sc_uuid), rc);
Expand Down
45 changes: 32 additions & 13 deletions src/dtx/dtx_cos.c
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,16 @@ dtx_cos_del_one(struct ds_cont_child *cont, struct dtx_cos_rec_child *dcrc)
return rc;
}

static void
dtx_cos_demote_one(struct ds_cont_child *cont, struct dtx_cos_rec_child *dcrc)
{
d_list_del(&dcrc->dcrc_gl_committable);
if (dcrc->dcrc_coll)
d_list_add_tail(&dcrc->dcrc_gl_committable, &cont->sc_dtx_coll_list);
else
d_list_add_tail(&dcrc->dcrc_gl_committable, &cont->sc_dtx_cos_list);
}

int
dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt,
daos_unit_oid_t *oid, daos_epoch_t epoch, bool force,
Expand Down Expand Up @@ -622,7 +632,7 @@ dtx_cos_add(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid,

int
dtx_cos_del(struct ds_cont_child *cont, struct dtx_id *xid,
daos_unit_oid_t *oid, uint64_t dkey_hash)
daos_unit_oid_t *oid, uint64_t dkey_hash, bool demote)
{
struct dtx_cos_key key;
d_iov_t kiov;
Expand All @@ -645,36 +655,41 @@ dtx_cos_del(struct ds_cont_child *cont, struct dtx_id *xid,

d_list_for_each_entry(dcrc, &dcr->dcr_prio_list, dcrc_lo_link) {
if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) {
rc = dtx_cos_del_one(cont, dcrc);
if (demote)
dtx_cos_demote_one(cont, dcrc);
else
rc = dtx_cos_del_one(cont, dcrc);
D_GOTO(out, found = 1);
}
}

d_list_for_each_entry(dcrc, &dcr->dcr_reg_list, dcrc_lo_link) {
if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) {
rc = dtx_cos_del_one(cont, dcrc);
if (demote)
dtx_cos_demote_one(cont, dcrc);
else
rc = dtx_cos_del_one(cont, dcrc);
D_GOTO(out, found = 2);
}
}

d_list_for_each_entry(dcrc, &dcr->dcr_expcmt_list, dcrc_lo_link) {
if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) {
rc = dtx_cos_del_one(cont, dcrc);
if (demote)
dtx_cos_demote_one(cont, dcrc);
else
rc = dtx_cos_del_one(cont, dcrc);
D_GOTO(out, found = 3);
}
}

out:
if (found > 0)
if (found > 0 && !demote)
d_tm_dec_gauge(dtx_tls_get()->dt_committable, 1);

if (rc == 0 && found == 0)
rc = -DER_NONEXIST;

DL_CDEBUG(rc != 0 && rc != -DER_NONEXIST, DLOG_ERR, DB_TRACE, rc,
"Remove DTX from CoS cache "DF_UOID", key %lu",
DP_UOID(*oid), (unsigned long)dkey_hash);

return rc == -DER_NONEXIST ? 0 : rc;
}

Expand Down Expand Up @@ -778,10 +793,14 @@ dtx_cos_batched_del(struct ds_cont_child *cont, struct dtx_id xid[], bool rm[],
if (memcmp(&dcrc->dcrc_dte->dte_xid, &xid[i], sizeof(struct dtx_id)) == 0) {
found = true;

if (rm[i]) {
rc = dtx_cos_del_one(cont, dcrc);
if (rc == 0)
del++;
if (rm != NULL) {
if (rm[i]) {
rc = dtx_cos_del_one(cont, dcrc);
if (rc == 0)
del++;
}
} else {
dtx_cos_demote_one(cont, dcrc);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/dtx/dtx_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ int dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt,
int dtx_cos_add(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid,
uint64_t dkey_hash, daos_epoch_t epoch, uint32_t flags);
int dtx_cos_del(struct ds_cont_child *cont, struct dtx_id *xid,
daos_unit_oid_t *oid, uint64_t dkey_hash);
daos_unit_oid_t *oid, uint64_t dkey_hash, bool demote);
uint64_t dtx_cos_oldest(struct ds_cont_child *cont);
void dtx_cos_prio(struct ds_cont_child *cont, struct dtx_id *xid,
daos_unit_oid_t *oid, uint64_t dkey_hash);
Expand Down
105 changes: 70 additions & 35 deletions src/dtx/dtx_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct dtx_req_args {
int dra_length;
/* The collective RPC result. */
int dra_result;
uint32_t dra_local_fail:1;
/* Pointer to the committed DTX list, used for DTX_REFRESH case. */
d_list_t *dra_cmt_list;
/* Pointer to the aborted DTX list, used for DTX_REFRESH case. */
Expand All @@ -81,6 +82,7 @@ struct dtx_req_rec {
int drr_count; /* DTX count */
int drr_result; /* The RPC result */
uint32_t drr_comp:1,
drr_local_fail:1,
drr_single_dti:1;
uint32_t drr_inline_flags;
struct dtx_id *drr_dti; /* The DTX array */
Expand Down Expand Up @@ -290,10 +292,13 @@ dtx_req_send(struct dtx_req_rec *drr, daos_epoch_t epoch)
"DTX req for opc %x to %d/%d (req %p future %p) sent epoch "DF_X64,
dra->dra_opc, drr->drr_rank, drr->drr_tag, req, dra->dra_future, epoch);

if (rc != 0 && drr->drr_comp == 0) {
drr->drr_comp = 1;
drr->drr_result = rc;
ABT_future_set(dra->dra_future, drr);
if (rc != 0) {
drr->drr_local_fail = 1;
if (drr->drr_comp == 0) {
drr->drr_comp = 1;
drr->drr_result = rc;
ABT_future_set(dra->dra_future, drr);
}
}

return rc;
Expand All @@ -309,13 +314,17 @@ dtx_req_list_cb(void **args)
if (dra->dra_opc == DTX_CHECK) {
for (i = 0; i < dra->dra_length; i++) {
drr = args[i];
if (drr->drr_local_fail)
dra->dra_local_fail = 1;
dtx_merge_check_result(&dra->dra_result, drr->drr_result);
D_DEBUG(DB_TRACE, "The DTX "DF_DTI" RPC req result %d, status is %d.\n",
DP_DTI(&drr->drr_dti[0]), drr->drr_result, dra->dra_result);
}
} else {
for (i = 0; i < dra->dra_length; i++) {
drr = args[i];
if (drr->drr_local_fail)
dra->dra_local_fail = 1;
if (dra->dra_result == 0 || dra->dra_result == -DER_NONEXIST)
dra->dra_result = drr->drr_result;
}
Expand Down Expand Up @@ -382,7 +391,12 @@ dtx_req_list_send(struct dtx_common_args *dca, bool is_reentrance)
if (rc != ABT_SUCCESS) {
D_ERROR("ABT_future_create failed for opc %x, len %d: rc %d.\n",
dra->dra_opc, dca->dca_steps, rc);
return dss_abterr2der(rc);
dca->dca_dra.dra_local_fail = 1;
if (dra->dra_opc == DTX_CHECK)
dtx_merge_check_result(&dra->dra_result, dss_abterr2der(rc));
else if (dra->dra_result == 0 || dra->dra_result == -DER_NONEXIST)
dra->dra_result = dss_abterr2der(rc);
return DSS_CHORE_DONE;
}

D_DEBUG(DB_TRACE, "%p: DTX req for opc %x, future %p (%d) start.\n",
Expand Down Expand Up @@ -750,7 +764,12 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes,
switch (opc) {
case DTX_COMMIT:
case DTX_ABORT:
if (rc != -DER_EXCLUDED && rc != -DER_OOG)
/*
* Continue to send out more RPCs as long as there is no local failure,
* then other healthy participants can commit/abort related DTX entries
* without being affected by the bad one(s).
*/
if (dca->dca_dra.dra_local_fail)
goto out;
break;
case DTX_CHECK:
Expand Down Expand Up @@ -826,17 +845,8 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes,
if (rc > 0 || rc == -DER_NONEXIST || rc == -DER_EXCLUDED || rc == -DER_OOG)
rc = 0;

if (rc != 0) {
/*
* Some DTX entries may have been committed on some participants. Then mark all
* the DTX entries (in the dtis) as "PARTIAL_COMMITTED" and re-commit them later.
* It is harmless to re-commit the DTX that has ever been committed.
*/
if (dra->dra_committed > 0)
rc1 = vos_dtx_set_flags(cont->sc_hdl, dca.dca_dtis, count,
DTE_PARTIAL_COMMITTED);
} else {
if (has_cos) {
if (rc == 0 || dra->dra_committed > 0) {
if (rc == 0 && has_cos) {
if (count > 1) {
D_ALLOC_ARRAY(rm_cos, count);
if (rm_cos == NULL)
Expand All @@ -846,7 +856,12 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes,
}
}

rc1 = vos_dtx_commit(cont->sc_hdl, dca.dca_dtis, count, rm_cos);
/*
* Some DTX entries may have been committed on some participants. Then mark all
* the DTX entries (in the dtis) as "PARTIAL_COMMITTED" and re-commit them later.
* It is harmless to re-commit the DTX that has ever been committed.
*/
rc1 = vos_dtx_commit(cont->sc_hdl, dca.dca_dtis, count, rc != 0, rm_cos);
if (rc1 > 0) {
dra->dra_committed += rc1;
rc1 = 0;
Expand All @@ -855,13 +870,28 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes,
rc1 = 0;
}

if (rc1 == 0 && rm_cos != NULL) {
/*
* For partial commit case, move related DTX entries to the tail of the
* committable list, then the next batched commit can commit others and
* retry those partial committed sometime later instead of blocking the
* others committable with continuously retry the failed ones.
*
* The side-effect of such behavior is that the DTX which is committable
* earlier maybe delay committed than the later ones.
*/
if (rc1 == 0 && has_cos) {
if (dcks != NULL) {
for (i = 0; i < count; i++) {
if (rm_cos[i]) {
D_ASSERT(!daos_oid_is_null(dcks[i].oid.id_pub));
if (rm_cos != NULL) {
for (i = 0; i < count; i++) {
if (!rm_cos[i])
continue;
dtx_cos_del(cont, &dca.dca_dtis[i], &dcks[i].oid,
dcks[i].dkey_hash);
dcks[i].dkey_hash, false);
}
} else {
for (i = 0; i < count; i++) {
dtx_cos_del(cont, &dca.dca_dtis[i], &dcks[i].oid,
dcks[i].dkey_hash, true);
}
}
} else {
Expand Down Expand Up @@ -1141,7 +1171,7 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che
* It has been committed/committable on leader, we may miss
* related DTX commit request, so let's commit it locally.
*/
rc1 = vos_dtx_commit(cont->sc_hdl, &dsp->dsp_xid, 1, NULL);
rc1 = vos_dtx_commit(cont->sc_hdl, &dsp->dsp_xid, 1, false, NULL);
if (rc1 == 0 || rc1 == -DER_NONEXIST || !for_io /* cleanup case */) {
d_list_del(&dsp->dsp_link);
dtx_dsp_free(dsp);
Expand Down Expand Up @@ -1636,24 +1666,29 @@ dtx_coll_commit(struct ds_cont_child *cont, struct dtx_coll_entry *dce, struct d
committed += dcra.dcra_committed;
}

if (rc == 0 && rc1 == 0)
rc2 = vos_dtx_commit(cont->sc_hdl, &dce->dce_xid, 1, NULL);
else if (committed > 0)
if ((rc == 0 && rc1 == 0) || committed > 0) {
/* Mark the DTX as "PARTIAL_COMMITTED" and re-commit it later via cleanup logic. */
rc2 = vos_dtx_set_flags(cont->sc_hdl, &dce->dce_xid, 1, DTE_PARTIAL_COMMITTED);
if (rc2 > 0 || rc2 == -DER_NONEXIST)
rc2 = 0;
rc2 = vos_dtx_commit(cont->sc_hdl, &dce->dce_xid, 1, rc != 0 || rc1 != 0, NULL);
if (rc2 > 0 || rc2 == -DER_NONEXIST)
rc2 = 0;
}

/*
* NOTE: Currently, we commit collective DTX one by one with high priority. So here we have
* to remove the collective DTX entry from the CoS even if the commit failed remotely.
* Otherwise, the batched commit ULT may be blocked by such "bad" entry.
* For partial commit case, move related DTX entries to the tail of the
* committable list, then the next batched commit can commit others and
* retry those partial committed sometime later instead of blocking the
* others committable with continuously retry the failed ones.
*
* The side-effect of such behavior is that the DTX which is committable
* earlier maybe delay committed than the later ones.
*/
if (rc2 == 0 && has_cos) {
if (dck != NULL)
dtx_cos_del(cont, &dce->dce_xid, &dck->oid, dck->dkey_hash);
dtx_cos_del(cont, &dce->dce_xid, &dck->oid, dck->dkey_hash,
rc != 0 || rc1 != 0);
else
dtx_cos_batched_del(cont, &dce->dce_xid, &cos, 1);
dtx_cos_batched_del(cont, &dce->dce_xid,
rc != 0 || rc1 != 0 ? NULL : &cos, 1);
}

D_CDEBUG(rc != 0 || rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE,
Expand Down
2 changes: 1 addition & 1 deletion src/dtx/dtx_srv.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ dtx_handler(crt_rpc_t *rpc)
count = din->di_dtx_array.ca_count - i;

dtis = (struct dtx_id *)din->di_dtx_array.ca_arrays + i;
rc1 = vos_dtx_commit(cont->sc_hdl, dtis, count, NULL);
rc1 = vos_dtx_commit(cont->sc_hdl, dtis, count, false, NULL);
if (rc1 > 0)
committed += rc1;
else if (rc == 0 && rc1 < 0)
Expand Down
3 changes: 2 additions & 1 deletion src/include/daos_srv/vos.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,14 @@ vos_dtx_load_mbs(daos_handle_t coh, struct dtx_id *dti, daos_unit_oid_t *oid,
* \param coh [IN] Container open handle.
* \param dtis [IN] The array for DTX identifiers to be committed.
* \param count [IN] The count of DTXs to be committed.
* \param keep_act [IN] Keep DTX entry or not.
* \param rm_cos [OUT] The array for whether remove entry from CoS cache.
*
* \return Negative value if error.
* \return Others are for the count of committed DTXs.
*/
int
vos_dtx_commit(daos_handle_t coh, struct dtx_id dtis[], int count, bool rm_cos[]);
vos_dtx_commit(daos_handle_t coh, struct dtx_id dtis[], int count, bool keep_act, bool rm_cos[]);

/**
* Abort the specified DTXs.
Expand Down
Loading

0 comments on commit b612777

Please sign in to comment.