Skip to content

Commit

Permalink
DAOS-16749 vos: OI iterator for phase2 pool (#15465)
Browse files Browse the repository at this point in the history
To minimize bucket eviction/load when iterating objects, vos_iterate_obj()
is introduced to iterate objects in bucket ID order instead of OI order.
The caller of vos_iterate_obj() needs to provide a filter callback to call
the vos_bkt_iter_skip() properly.

Applied the vos_iterate_obj() for EC & VOS aggregation.

Signed-off-by: Niu Yawei <[email protected]>
  • Loading branch information
NiuYawei authored Nov 12, 2024
1 parent d4070e8 commit fb89454
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 19 deletions.
34 changes: 34 additions & 0 deletions src/include/daos_srv/vos.h
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,40 @@ vos_iterate(vos_iter_param_t *param, vos_iter_type_t type, bool recursive,
struct vos_iter_anchors *anchors, vos_iter_cb_t pre_cb,
vos_iter_cb_t post_cb, void *arg, struct dtx_handle *dth);

/**
* Iterate VOS objects and subtrees when recursive mode is specified. When it's
* called against md-on-ssd phase2 pool, it iterates objects in bucket ID order
* instead of OID order to minimize bucket eviction/load.
*
* \param[in] param iteration parameters
* \param[in] recursive iterate in lower level recursively
* \param[in] anchors array of anchors, one for each
* iteration level
* \param[in] pre_cb pre subtree iteration callback
* \param[in] post_cb post subtree iteration callback
* \param[in] arg callback argument
* \param[in] dth DTX handle
*
* \retval 0 iteration complete
* \retval > 0 callback return value
* \retval -DER_* error (but never -DER_NONEXIST)
*/
int
vos_iterate_obj(vos_iter_param_t *param, bool recursive, struct vos_iter_anchors *anchors,
vos_iter_cb_t pre_cb, vos_iter_cb_t post_cb, void *arg, struct dtx_handle *dth);

/**
* Skip the object not located on specified bucket (for md-on-ssd phase2).
*
* \param ih[IN] Iterator handle
* \param desc[IN] Iterator desc for current OI entry
*
* \return true: current entry is skipped
* false: current entry isn't skipped
*/
bool
vos_bkt_iter_skip(daos_handle_t ih, vos_iter_desc_t *desc);

/**
* Retrieve the largest or smallest integer DKEY, AKEY, and array offset from an
* object. If object does not have an array value, 0 is returned in extent. User
Expand Down
10 changes: 8 additions & 2 deletions src/include/daos_srv/vos_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,12 @@ enum {

typedef struct {
union {
/** The object id of the entry */
daos_unit_oid_t id_oid;
struct {
/** The object id of the entry */
daos_unit_oid_t id_oid;
/** The bucket id of the object (for md-on-ssd phase2) */
uint32_t id_bkt;
};
/** The key for the entry */
d_iov_t id_key;
};
Expand Down Expand Up @@ -445,6 +449,8 @@ typedef struct {
vos_iter_filter_cb_t ip_filter_cb;
/** filter callback argument (vos_iterate only) */
void *ip_filter_arg;
/** auxiliary data for md-on-ssd phase2 OI iterator */
void *ip_bkt_iter;
/** flags for for iterator */
uint32_t ip_flags;
} vos_iter_param_t;
Expand Down
11 changes: 9 additions & 2 deletions src/object/srv_ec_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -2400,6 +2400,13 @@ agg_filter(daos_handle_t ih, vos_iter_desc_t *desc, void *cb_arg, unsigned int *
*acts = VOS_ITER_CB_SKIP;
goto done;
}

/* This MUST be the last check */
if (desc->id_type == VOS_ITER_OBJ && vos_bkt_iter_skip(ih, desc)) {
agg_param->ap_credits++;
*acts |= VOS_ITER_CB_SKIP;
goto done;
}
done:
if (agg_param->ap_credits > agg_param->ap_credits_max) {
agg_param->ap_credits = 0;
Expand Down Expand Up @@ -2733,8 +2740,8 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
if (rc != 0)
goto update_hae;

rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors, agg_iterate_pre_cb,
agg_iterate_post_cb, ec_agg_param, dth);
rc = vos_iterate_obj(&iter_param, true, &anchors, agg_iterate_pre_cb,
agg_iterate_post_cb, ec_agg_param, dth);
if (rc == -DER_INPROGRESS && !d_list_empty(&dth->dth_share_tbd_list)) {
uint64_t now = daos_gettime_coarse();

Expand Down
19 changes: 10 additions & 9 deletions src/vos/vos_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,7 @@ vos_agg_filter(daos_handle_t ih, vos_iter_desc_t *desc, void *cb_arg, unsigned i
struct vos_agg_param *agg_param = cb_arg;
int rc = 0;

rc = need_aggregate(ih, agg_param, desc);
if (rc == 0) {
if (!need_aggregate(ih, agg_param, desc)) {
if (desc->id_type == VOS_ITER_OBJ) {
D_DEBUG(DB_EPC, "Skip untouched oid:"DF_UOID"\n",
DP_UOID(desc->id_oid));
Expand All @@ -360,9 +359,6 @@ vos_agg_filter(daos_handle_t ih, vos_iter_desc_t *desc, void *cb_arg, unsigned i
D_GOTO(out, rc = 0);
}

if (rc < 0) /** Ignore the filter error, let iterator handle it on actual probe */
D_GOTO(out, rc = 0);

if (desc->id_type == VOS_ITER_OBJ)
rc = oi_iter_check_punch(ih);
else
Expand All @@ -374,8 +370,14 @@ vos_agg_filter(daos_handle_t ih, vos_iter_desc_t *desc, void *cb_arg, unsigned i
inc_agg_counter(agg_param, desc->id_type, AGG_OP_DEL);
D_GOTO(out, rc = 0);
}
out:

/* This MUST be the last check */
if (desc->id_type == VOS_ITER_OBJ && vos_bkt_iter_skip(ih, desc)) {
credits_consume(&agg_param->ap_credits, AGG_OP_SCAN);
*acts |= VOS_ITER_CB_SKIP;
D_GOTO(out, rc = 0);
}
out:
if (credits_exhausted(&agg_param->ap_credits) ||
(DAOS_FAIL_CHECK(DAOS_VOS_AGG_RANDOM_YIELD) && (rand() % 2))) {
D_DEBUG(DB_EPC, "Credits exhausted, type:%u, acts:%u\n", desc->id_type, *acts);
Expand Down Expand Up @@ -2710,9 +2712,8 @@ vos_aggregate(daos_handle_t coh, daos_epoch_range_t *epr,

ad->ad_iter_param.ip_flags |= VOS_IT_FOR_PURGE | VOS_IT_FOR_AGG;
retry:
rc = vos_iterate(&ad->ad_iter_param, VOS_ITER_OBJ, true, &ad->ad_anchors,
vos_aggregate_pre_cb, vos_aggregate_post_cb,
&ad->ad_agg_param, NULL);
rc = vos_iterate_obj(&ad->ad_iter_param, true, &ad->ad_anchors, vos_aggregate_pre_cb,
vos_aggregate_post_cb, &ad->ad_agg_param, NULL);
if (rc == -DER_BUSY) {
/** Hit a conflict with obj_discard. Rather than exiting, let's
* yield and try again.
Expand Down
12 changes: 6 additions & 6 deletions src/vos/vos_dtx_iter.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2019-2023 Intel Corporation.
* (C) Copyright 2019-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -27,15 +27,15 @@ struct vos_dtx_iter {
};

static struct vos_dtx_iter *
iter2oiter(struct vos_iterator *iter)
iter2dtxiter(struct vos_iterator *iter)
{
return container_of(iter, struct vos_dtx_iter, oit_iter);
}

static int
dtx_iter_fini(struct vos_iterator *iter)
{
struct vos_dtx_iter *oiter = iter2oiter(iter);
struct vos_dtx_iter *oiter = iter2dtxiter(iter);
int rc = 0;

D_ASSERT(iter->it_type == VOS_ITER_DTX);
Expand Down Expand Up @@ -96,7 +96,7 @@ dtx_iter_prep(vos_iter_type_t type, vos_iter_param_t *param,
static int
dtx_iter_probe(struct vos_iterator *iter, daos_anchor_t *anchor, uint32_t next /* Unimplemented */)
{
struct vos_dtx_iter *oiter = iter2oiter(iter);
struct vos_dtx_iter *oiter = iter2dtxiter(iter);
struct vos_dtx_act_ent *dae;
d_iov_t rec_iov;
int rc = 0;
Expand Down Expand Up @@ -168,7 +168,7 @@ dtx_iter_probe(struct vos_iterator *iter, daos_anchor_t *anchor, uint32_t next /
static int
dtx_iter_next(struct vos_iterator *iter, daos_anchor_t *anchor)
{
struct vos_dtx_iter *oiter = iter2oiter(iter);
struct vos_dtx_iter *oiter = iter2dtxiter(iter);
struct vos_dtx_act_ent *dae;
d_iov_t rec_iov;
int rc = 0;
Expand Down Expand Up @@ -215,7 +215,7 @@ static int
dtx_iter_fetch(struct vos_iterator *iter, vos_iter_entry_t *it_entry,
daos_anchor_t *anchor)
{
struct vos_dtx_iter *oiter = iter2oiter(iter);
struct vos_dtx_iter *oiter = iter2dtxiter(iter);
struct vos_dtx_act_ent *dae;
d_iov_t rec_iov;
int rc;
Expand Down
6 changes: 6 additions & 0 deletions src/vos/vos_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,12 @@ gc_open_cont(struct vos_container *cont);
void
gc_close_cont(struct vos_container *cont);

struct vos_bkt_iter {
uint32_t bi_bkt_tot;
uint32_t bi_bkt_cur;
uint8_t bi_skipped[0];
};

/**
* If the object is fully punched, bypass normal aggregation and move it to container
* discard pool.
Expand Down
75 changes: 75 additions & 0 deletions src/vos/vos_iterator.c
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,81 @@ vos_iterate_key(struct vos_object *obj, daos_handle_t toh, vos_iter_type_t type,
return rc;
}

static inline void
bkt_iter_free(struct vos_bkt_iter *bkt_iter)
{
D_FREE(bkt_iter);
}

static struct vos_bkt_iter *
bkt_iter_alloc(struct vos_pool *pool)
{
struct umem_store *store = vos_pool2store(pool);
struct umem_cache *cache = store->cache;
struct vos_bkt_iter *bkt_iter;
unsigned int bitmap_sz;

D_ASSERT(cache != NULL && cache->ca_md_pages > 0);
bitmap_sz = (cache->ca_md_pages + NBBY - 1) / NBBY;
D_ALLOC(bkt_iter, sizeof(*bkt_iter) + bitmap_sz);
if (bkt_iter == NULL)
return NULL;

bkt_iter->bi_bkt_tot = cache->ca_md_pages;
bkt_iter->bi_bkt_cur = UMEM_DEFAULT_MBKT_ID;

return bkt_iter;
}

int
vos_iterate_obj(vos_iter_param_t *param, bool recursive, struct vos_iter_anchors *anchors,
vos_iter_cb_t pre_cb, vos_iter_cb_t post_cb, void *arg, struct dtx_handle *dth)
{
struct vos_container *cont;
struct vos_bkt_iter *bkt_iter;
uint32_t i, iter_cnt = 0;
int rc = 0;

/* Not supposed being called by external enumeration which updating read timestamp */
D_ASSERT(!dtx_is_valid_handle(dth));

cont = vos_hdl2cont(param->ip_hdl);
if (!vos_pool_is_evictable(cont->vc_pool))
return vos_iterate_internal(param, VOS_ITER_OBJ, recursive, false, anchors,
pre_cb, post_cb, arg, dth);

/* The caller must provide a filter callback and call the oi_bkt_iter_skip() properly */
D_ASSERT(param->ip_filter_cb != NULL && param->ip_bkt_iter == NULL);

bkt_iter = bkt_iter_alloc(cont->vc_pool);
if (bkt_iter == NULL)
return -DER_NOMEM;

param->ip_bkt_iter = bkt_iter;
for (i = UMEM_DEFAULT_MBKT_ID; i < bkt_iter->bi_bkt_tot; i++) {
if (i > UMEM_DEFAULT_MBKT_ID) {
/* The bucket wasn't skipped in prior rounds of iterating */
if (!isset(&bkt_iter->bi_skipped[0], i))
continue;
bkt_iter->bi_bkt_cur = i;
}

iter_cnt++;
rc = vos_iterate_internal(param, VOS_ITER_OBJ, recursive, false, anchors,
pre_cb, post_cb, arg, dth);
if (rc) {
DL_ERROR(rc, "Iterate bucket:%u failed.", i);
break;
}
}
D_DEBUG(DB_TRACE, "Iterate %u/%u buckets.\n", iter_cnt, bkt_iter->bi_bkt_tot);

bkt_iter_free(bkt_iter);
param->ip_bkt_iter = NULL;

return rc;
}

/**
* Iterate VOS entries (i.e., containers, objects, dkeys, etc.) and call \a
* cb(\a arg) for each entry.
Expand Down
44 changes: 44 additions & 0 deletions src/vos/vos_obj_index.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ struct vos_oi_iter {
struct vos_ilog_info oit_ilog_info;
/** punched epoch for current entry */
daos_epoch_t oit_punched;
/** auxiliary data for md-on-ssd phase2 OI iterator */
struct vos_bkt_iter *oit_bkt_iter;
/** cached iterator flags */
uint32_t oit_flags;
};
Expand Down Expand Up @@ -580,6 +582,7 @@ oi_iter_prep(vos_iter_type_t type, vos_iter_param_t *param,
oiter->oit_iter.it_filter_cb = param->ip_filter_cb;
oiter->oit_iter.it_filter_arg = param->ip_filter_arg;
oiter->oit_flags = param->ip_flags;
oiter->oit_bkt_iter = param->ip_bkt_iter;
if (param->ip_flags & VOS_IT_FOR_PURGE)
oiter->oit_iter.it_for_purge = 1;
if (param->ip_flags & VOS_IT_FOR_DISCARD)
Expand Down Expand Up @@ -635,6 +638,11 @@ oi_iter_match_probe(struct vos_iterator *iter, daos_anchor_t *anchor, uint32_t f
desc.id_type = VOS_ITER_OBJ;
desc.id_oid = obj->vo_id;
desc.id_parent_punch = 0;
if (vos_pool_is_evictable(oiter->oit_cont->vc_pool)) {
struct vos_obj_p2_df *p2 = (struct vos_obj_p2_df *)obj;

desc.id_bkt = p2->p2_bkt_ids[0];
}

feats = dbtree_feats_get(&obj->vo_tree);

Expand Down Expand Up @@ -963,6 +971,42 @@ struct vos_iter_ops vos_oi_iter_ops = {
.iop_process = oi_iter_process,
};

bool
vos_bkt_iter_skip(daos_handle_t ih, vos_iter_desc_t *desc)
{
struct vos_iterator *iter = vos_hdl2iter(ih);
struct vos_oi_iter *oiter;
struct vos_bkt_iter *bkt_iter;

D_ASSERT(desc->id_type == VOS_ITER_OBJ);
oiter = iter2oiter(iter);

if (!vos_pool_is_evictable(oiter->oit_cont->vc_pool))
return false;

/* Called from the common vos_iterate() */
if (oiter->oit_bkt_iter == NULL)
return false;

bkt_iter = oiter->oit_bkt_iter;
D_ASSERT(bkt_iter->bi_bkt_cur < bkt_iter->bi_bkt_tot);
D_ASSERT(desc->id_bkt < bkt_iter->bi_bkt_tot);

/* Lower bucket ID is already iterated */
if (desc->id_bkt < bkt_iter->bi_bkt_cur)
return true;
else if (desc->id_bkt == bkt_iter->bi_bkt_cur)
return false;

/*
* Mark the skipped bitmap for higher bucket ID, vos_iterate_obj() will skip the
* the bucket if it's not marked in bitmap.
*/
if (!isset(&bkt_iter->bi_skipped[0], desc->id_bkt))
setbit(&bkt_iter->bi_skipped[0], desc->id_bkt);
return true;
}

/**
* Internal usage APIs
* For use from container APIs and init APIs
Expand Down

0 comments on commit fb89454

Please sign in to comment.