Skip to content

Commit

Permalink
Merge pull request #625 from lukemartinlogan/dev
Browse files Browse the repository at this point in the history
Comment out score histogram
  • Loading branch information
lukemartinlogan authored Oct 19, 2023
2 parents 3aa267d + 7986794 commit 65a5564
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 39 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ if(thallium_FOUND)
endif()

# Boost
find_package(Boost REQUIRED COMPONENTS regex system filesystem fiber REQUIRED)
# find_package(Boost REQUIRED COMPONENTS regex system filesystem fiber REQUIRED)
find_package(Boost REQUIRED COMPONENTS fiber REQUIRED)
if (Boost_FOUND)
message(STATUS "found boost at ${Boost_INCLUDE_DIRS}")
endif()
Expand Down
2 changes: 1 addition & 1 deletion ci/hermes/packages/hermes/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Hermes(CMakePackage):
depends_on('cereal')
depends_on('yaml-cpp')
depends_on('[email protected]')
depends_on('[email protected]: +context +fiber')
depends_on('[email protected]: +context +fiber +filesystem +system +atomic +chrono +serialization +signals +pic')
depends_on('libfabric fabrics=sockets,tcp,udp,rxm,rxd,verbs',
when='+ares')
depends_on('libfabric fabrics=verbs',
Expand Down
2 changes: 1 addition & 1 deletion ci/hermes/packages/hermes_shm/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class HermesShm(CMakePackage):
depends_on('cereal')
depends_on('yaml-cpp')
depends_on('[email protected]')
depends_on('[email protected]: +context +fiber')
depends_on('[email protected]: +context +fiber +filesystem +system +atomic +chrono +serialization +signals +pic +regex')
depends_on('libfabric fabrics=sockets,tcp,udp,rxm,rxd,verbs',
when='+ares')
depends_on('libfabric fabrics=verbs',
Expand Down
6 changes: 3 additions & 3 deletions include/hermes/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ class Bucket {
* */
void ReorganizeBlob(const BlobId &blob_id,
float score) {
blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, 0);
blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, 0, true);
}

/**
Expand All @@ -402,7 +402,7 @@ class Bucket {
void ReorganizeBlob(const BlobId &blob_id,
float score,
Context &ctx) {
blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, 0);
blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, 0, true);
}

/**
Expand All @@ -412,7 +412,7 @@ class Bucket {
float score,
u32 node_id,
Context &ctx) {
blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, node_id);
blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, node_id, true);
}

/**
Expand Down
9 changes: 5 additions & 4 deletions include/hermes/config_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ class ConfigurationManager {
mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_mdm");
blob_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_blob_mdm");
bkt_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_bkt_mdm");
op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm",
bkt_mdm_.id_, blob_mdm_.id_);
stager_mdm_.CreateRoot(DomainId::GetGlobal(),
"hermes_stager_mdm", blob_mdm_.id_);
// TODO(llogan): add back
// op_mdm_.CreateRoot(DomainId::GetGlobal(), "hermes_op_mdm",
// bkt_mdm_.id_, blob_mdm_.id_);
// stager_mdm_.CreateRoot(DomainId::GetGlobal(),
// "hermes_stager_mdm", blob_mdm_.id_);
blob_mdm_.SetBucketMdmRoot(DomainId::GetGlobal(),
bkt_mdm_.id_,
stager_mdm_.id_, op_mdm_.id_);
Expand Down
2 changes: 2 additions & 0 deletions include/hermes/hermes_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,12 @@ struct BlobInfo {
size_t blob_size_; /**< The overall size of the blob */
size_t max_blob_size_; /**< The amount of space current buffers support */
float score_; /**< The priority of this blob */
float user_score_; /**< The user-defined priority of this blob */
std::atomic<u32> access_freq_; /**< Number of times blob accessed in epoch */
hshm::Timepoint last_access_; /**< Last time blob accessed */
std::atomic<size_t> mod_count_; /**< The number of times blob modified */
std::atomic<size_t> last_flush_; /**< The last mod that was flushed */
bitfield32_t flags_; /**< Flags */

/** Serialization */
template<typename Ar>
Expand Down
12 changes: 6 additions & 6 deletions tasks/bdev/include/bdev/bdev.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,19 @@ class Client : public TaskLibClient {
class Server {
public:
ssize_t rem_cap_; /**< Remaining capacity */
Histogram score_hist_; /**< Score distribution */
// Histogram score_hist_; /**< Score distribution */

public:
void UpdateScore(UpdateScoreTask *task, RunContext &ctx) {
if (task->old_score_ >= 0) {
score_hist_.Decrement(task->old_score_);
}
score_hist_.Increment(task->new_score_);
// if (task->old_score_ >= 0) {
// score_hist_.Decrement(task->old_score_);
// }
// score_hist_.Increment(task->new_score_);
}

void Monitor(MonitorTask *task, RunContext &ctx) {
task->rem_cap_ = rem_cap_;
task->score_hist_ = score_hist_;
// task->score_hist_ = score_hist_;
}
};

Expand Down
2 changes: 1 addition & 1 deletion tasks/bdev/include/bdev/bdev_tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ struct ReadTask : public Task, TaskFlags<TF_LOCAL> {
/** A task to monitor bdev statistics */
struct MonitorTask : public Task, TaskFlags<TF_LOCAL> {
OUT size_t rem_cap_; /**< Remaining capacity of the target */
OUT Histogram score_hist_; /**< Score distribution */
// OUT Histogram score_hist_; /**< Score distribution */

/** SHM default constructor */
HSHM_ALWAYS_INLINE explicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,12 @@ class Client : public TaskLibClient {
const TagId &tag_id,
const BlobId &blob_id,
float score,
u32 node_id) {
u32 node_id,
bool user_score) {
// HILOG(kDebug, "Beginning REORGANIZE (task_node={})", task_node);
HRUN_CLIENT->ConstructTask<ReorganizeBlobTask>(
task, task_node, DomainId::GetNode(blob_id.node_id_), id_,
tag_id, blob_id, score, node_id);
tag_id, blob_id, score, node_id, user_score);
}
HRUN_TASK_NODE_PUSH_ROOT(ReorganizeBlob);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ class PutBlobPhase {
#define HERMES_BLOB_DID_CREATE BIT_OPT(u32, 4)
#define HERMES_GET_BLOB_ID BIT_OPT(u32, 5)
#define HERMES_HAS_DERIVED BIT_OPT(u32, 6)
#define HERMES_USER_SCORE_STATIONARY BIT_OPT(u32, 7)

/** A task to put data in a blob */
struct PutBlobTask : public Task, TaskFlags<TF_SRL_ASYM_START | TF_SRL_SYM_END> {
Expand Down Expand Up @@ -1075,6 +1076,7 @@ struct ReorganizeBlobTask : public Task, TaskFlags<TF_SRL_SYM> {
IN BlobId blob_id_;
IN float score_;
IN u32 node_id_;
IN bool is_user_score_;
TEMP int phase_ = ReorganizeBlobPhase::kGet;
TEMP hipc::Pointer data_;
TEMP size_t data_size_;
Expand All @@ -1095,7 +1097,8 @@ struct ReorganizeBlobTask : public Task, TaskFlags<TF_SRL_SYM> {
const TagId &tag_id,
const BlobId &blob_id,
float score,
u32 node_id) : Task(alloc) {
u32 node_id,
bool is_user_score) : Task(alloc) {
// Initialize task
task_node_ = task_node;
lane_hash_ = blob_id.hash_;
Expand All @@ -1110,6 +1113,7 @@ struct ReorganizeBlobTask : public Task, TaskFlags<TF_SRL_SYM> {
blob_id_ = blob_id;
score_ = score;
node_id_ = node_id;
is_user_score_ = is_user_score;
}

/** (De)serialize message call */
Expand Down
58 changes: 44 additions & 14 deletions tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,36 @@ class Server : public TaskLib {
if (access_score > 1) {
access_score = 1;
}
return std::max(freq_score, access_score);
float data_score = std::max(freq_score, access_score);
float user_score = blob_info.user_score_;
if (!blob_info.flags_.Any(HERMES_USER_SCORE_STATIONARY)) {
user_score *= data_score;
}
return std::max(data_score, user_score);
}

/** Check if blob should be reorganized */
template<bool UPDATE_SCORE=false>
bool ShouldReorganize(BlobInfo &blob_info,
float score,
TaskNode &task_node) {
// for (BufferInfo &buf : blob_info.buffers_) {
// TargetInfo &target = *target_map_[buf.tid_];
// Histogram &hist = target.monitor_task_->score_hist_;
// if constexpr(UPDATE_SCORE) {
// target.AsyncUpdateScore(task_node + 1,
// blob_info.score_, score);
// }
// u32 percentile = hist.GetPercentile(score);
// size_t rem_cap = target.monitor_task_->rem_cap_;
// size_t max_cap = target.max_cap_;
// if (rem_cap < max_cap / 10) {
// if (percentile < 10 || percentile > 90) {
// return true;
// }
// }
// }
return false;
}

/**
Expand All @@ -150,22 +179,11 @@ class Server : public TaskLib {
BlobInfo &blob_info = it.second;
// Update blob scores
float new_score = MakeScore(blob_info, now);
bool reorganize = false;
for (BufferInfo &buf : blob_info.buffers_) {
TargetInfo &target = *target_map_[buf.tid_];
Histogram &hist = target.monitor_task_->score_hist_;
target.AsyncUpdateScore(task->task_node_ + 1,
blob_info.score_, new_score);
u32 percentile = hist.GetPercentile(blob_info.score_);
if (percentile < 10 || percentile > 90) {
reorganize = true;
}
}
if (reorganize) {
if (ShouldReorganize<true>(blob_info, new_score, task->task_node_)) {
blob_mdm_.AsyncReorganizeBlob(task->task_node_ + 1,
blob_info.tag_id_,
blob_info.blob_id_,
new_score, 0);
new_score, 0, false);
}
blob_info.access_freq_ = 0;
blob_info.score_ = new_score;
Expand Down Expand Up @@ -345,6 +363,7 @@ class Server : public TaskLib {
for (BufferInfo &buf : blob_info.buffers_) {
TargetInfo &target = *target_map_[buf.tid_];
std::vector<BufferInfo> buf_vec = {buf};
// TODO(llogan): add back
target.AsyncFree(task->task_node_ + 1,
blob_info.score_,
std::move(buf_vec), true);
Expand Down Expand Up @@ -663,6 +682,17 @@ class Server : public TaskLib {
return;
}
BlobInfo &blob_info = it->second;
if (task->is_user_score_) {
blob_info.user_score_ = task->score_;
blob_info.score_ = std::max(blob_info.user_score_,
blob_info.score_);
} else {
blob_info.score_ = task->score_;
}
if (!ShouldReorganize(blob_info, task->score_, task->task_node_)) {
task->SetModuleComplete();
return;
}
task->data_ = HRUN_CLIENT->AllocateBuffer(blob_info.blob_size_).shm_;
task->data_size_ = blob_info.blob_size_;
task->get_task_ = blob_mdm_.AsyncGetBlob(task->task_node_ + 1,
Expand Down
4 changes: 2 additions & 2 deletions tasks/posix_bdev/src/posix_bdev.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ namespace hermes::posix_bdev {
alloc_.Allocate(task->size_, *task->buffers_, task->alloc_size_);
HILOG(kDebug, "Allocated {}/{} bytes ({})", task->alloc_size_, task->size_, path_);
rem_cap_ -= task->alloc_size_;
score_hist_.Increment(task->score_);
// score_hist_.Increment(task->score_);
task->SetModuleComplete();
}

void Free(FreeTask *task, RunContext &rctx) {
rem_cap_ += alloc_.Free(task->buffers_);
score_hist_.Decrement(task->score_);
// score_hist_.Decrement(task->score_);
task->SetModuleComplete();
}

Expand Down
6 changes: 3 additions & 3 deletions tasks/ram_bdev/src/ram_bdev.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Server : public TaskLib, public bdev::Server {
rem_cap_ = dev_info.capacity_;
alloc_.Init(id_, dev_info.capacity_, dev_info.slab_sizes_);
mem_ptr_ = (char*)malloc(dev_info.capacity_);
score_hist_.Resize(10);
// score_hist_.Resize(10);
HILOG(kDebug, "Created {} at {} of size {}",
dev_info.dev_name_, dev_info.mount_point_, dev_info.capacity_);
task->SetModuleComplete();
Expand All @@ -35,14 +35,14 @@ class Server : public TaskLib, public bdev::Server {
HILOG(kDebug, "Allocating {} bytes (RAM)", task->size_);
alloc_.Allocate(task->size_, *task->buffers_, task->alloc_size_);
rem_cap_ -= task->alloc_size_;
score_hist_.Increment(task->score_);
// score_hist_.Increment(task->score_);
HILOG(kDebug, "Allocated {} bytes (RAM)", task->alloc_size_);
task->SetModuleComplete();
}

void Free(FreeTask *task, RunContext &rctx) {
rem_cap_ += alloc_.Free(task->buffers_);
score_hist_.Decrement(task->score_);
// score_hist_.Decrement(task->score_);
task->SetModuleComplete();
}

Expand Down

0 comments on commit 65a5564

Please sign in to comment.