Skip to content

Commit

Permalink
i#5694 core-sharded: Add core-serial support (#6519)
Browse files Browse the repository at this point in the history
Adds a new scheduler option single_lockstep_output which multiplexes the
virtual core output streams onto a single global stream. This is simple
to implement as the existing scheduler_t::stream_t class already
multiplexes inputs onto an output.

Hooks up the drcachesim launcher -core_serial option to this new
scheduler mode.

Updates the schedule_stats, basic_counts, and cache_simulator tools to
support core_serial. For cache_simulator, the existing thread-to-core
mapping code for round-robin and for -cpu_scheduling is kept for when in
thread-sharded mode; in core-sharded mode, the scheduler's cpuid is
mapped to a core index.

Adds a core_serial test of schedule_stats and basic_counts and a test of
cache_simulator using the scheduler's -cpu_schedule_file as-traced mode.

Adds some dr$sim unit tests for cpuid to core mapping and error modes.

Issue: #5694
  • Loading branch information
derekbruening committed Dec 21, 2023
1 parent 86a4f1b commit d2f47f3
Show file tree
Hide file tree
Showing 17 changed files with 432 additions and 57 deletions.
18 changes: 16 additions & 2 deletions clients/drcachesim/analyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler_common(
sched_inputs[0] = std::move(workload);

typename sched_type_t::scheduler_options_t sched_ops;
int output_count = worker_count_;
if (shard_type_ == SHARD_BY_CORE) {
// Subclass must pass us options and set worker_count_ to # cores.
if (worker_count_ <= 0) {
Expand All @@ -291,15 +292,23 @@ analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler_common(
sched_ops = std::move(options);
if (sched_ops.quantum_unit == sched_type_t::QUANTUM_TIME)
sched_by_time_ = true;
if (!parallel_) {
// output_count remains the # of virtual cores, but we have just
// one worker thread. The scheduler multiplexes the output_count output
// cores onto a single stream for us with this option:
sched_ops.single_lockstep_output = true;
worker_count_ = 1;
}
} else if (parallel_) {
sched_ops = sched_type_t::make_scheduler_parallel_options(verbosity_);
if (worker_count_ <= 0)
worker_count_ = std::thread::hardware_concurrency();
output_count = worker_count_;
} else {
sched_ops = sched_type_t::make_scheduler_serial_options(verbosity_);
worker_count_ = 1;
output_count = 1;
}
int output_count = worker_count_;
if (scheduler_.init(sched_inputs, output_count, std::move(sched_ops)) !=
sched_type_t::STATUS_SUCCESS) {
ERRMSG("Failed to initialize scheduler: %s\n",
Expand Down Expand Up @@ -470,7 +479,12 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_serial(analyzer_worker_data_t &
uint64_t cur_micros = sched_by_time_ ? get_current_microseconds() : 0;
typename sched_type_t::stream_status_t status =
worker.stream->next_record(record, cur_micros);
if (status != sched_type_t::STATUS_OK) {
if (status == sched_type_t::STATUS_WAIT) {
record = create_wait_marker();
} else if (status == sched_type_t::STATUS_IDLE) {
assert(shard_type_ == SHARD_BY_CORE);
record = create_idle_marker();
} else if (status != sched_type_t::STATUS_OK) {
if (status != sched_type_t::STATUS_EOF) {
if (status == sched_type_t::STATUS_REGION_INVALID) {
worker.error =
Expand Down
2 changes: 2 additions & 0 deletions clients/drcachesim/analyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
init_scheduler(const std::string &trace_path, memref_tid_t only_thread, int verbosity,
typename sched_type_t::scheduler_options_t options);

// For core-sharded, worker_count_ must be set prior to calling this; for parallel
// mode if it is not set it will be set to the underlying core count.
bool
init_scheduler(std::unique_ptr<ReaderType> reader,
std::unique_ptr<ReaderType> reader_end, int verbosity,
Expand Down
11 changes: 1 addition & 10 deletions clients/drcachesim/analyzer_multi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,7 @@ analyzer_multi_t::analyzer_multi_t()
scheduler_t::scheduler_options_t sched_ops;
if (op_core_sharded.get_value() || op_core_serial.get_value()) {
if (op_core_serial.get_value()) {
// TODO i#5694: Add serial core-sharded support by having the
// analyzer create #cores streams but walk them in lockstep.
// Then, update drcachesim to use get_output_cpuid().
error_string_ = "-core_serial is not yet implemented";
success_ = false;
return;
parallel_ = false;
}
sched_ops = init_dynamic_schedule();
}
Expand Down Expand Up @@ -284,10 +279,6 @@ analyzer_multi_t::init_dynamic_schedule()
bool
analyzer_multi_t::create_analysis_tools()
{
/* TODO i#2006: add multiple tool support. */
/* TODO i#2006: create a single top-level tool for multi-component
* tools.
*/
tools_ = new analysis_tool_t *[max_num_tools_];
if (!op_simulator_type.get_value().empty()) {
std::stringstream stream(op_simulator_type.get_value());
Expand Down
5 changes: 3 additions & 2 deletions clients/drcachesim/common/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,9 @@ droption_t<bool> op_cpu_scheduling(
"round-robin fashion. This option causes the scheduler to instead use the recorded "
"cpu that each thread executed on (at a granularity of the trace buffer size) "
"for scheduling, mapping traced cpu's to cores and running each segment of each "
"thread "
"on the core that owns the recorded cpu for that segment.");
"thread on the core that owns the recorded cpu for that segment. "
"This option is not supported with -core_serial; use "
"-cpu_schedule_file with -core_serial instead.");

droption_t<bytesize_t> op_max_trace_size(
DROPTION_SCOPE_CLIENT, "max_trace_size", 0,
Expand Down
31 changes: 21 additions & 10 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,11 @@ typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::stream_t::next_record(RecordType &record,
uint64_t cur_time)
{
if (max_ordinal_ > 0) {
++ordinal_;
if (ordinal_ >= max_ordinal_)
ordinal_ = 0;
}
input_info_t *input = nullptr;
sched_type_t::stream_status_t res =
scheduler_->next_record(ordinal_, record, input, cur_time);
Expand Down Expand Up @@ -633,13 +638,19 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
static_cast<int>(sched_type_t::SCHEDULER_SPECULATE_NOPS));

outputs_.reserve(output_count);
if (options_.single_lockstep_output) {
global_stream_ = std::unique_ptr<sched_type_t::stream_t>(
new sched_type_t::stream_t(this, 0, verbosity_, output_count));
}
for (int i = 0; i < output_count; ++i) {
outputs_.emplace_back(this, i,
TESTANY(SCHEDULER_SPECULATE_NOPS, options_.flags)
? spec_type_t::USE_NOPS
// TODO i#5843: Add more flags for other options.
: spec_type_t::LAST_FROM_TRACE,
create_invalid_record(), verbosity_);
if (options_.single_lockstep_output)
outputs_.back().stream = global_stream_.get();
if (options_.schedule_record_ostream != nullptr) {
sched_type_t::stream_status_t status = record_schedule_segment(
i, schedule_record_t::VERSION, schedule_record_t::VERSION_CURRENT, 0, 0);
Expand Down Expand Up @@ -1523,22 +1534,22 @@ scheduler_tmpl_t<RecordType, ReaderType>::skip_instructions(output_ordinal_t out
return sched_type_t::STATUS_REGION_INVALID;
}
input.in_cur_region = true;
auto &stream = outputs_[output].stream;
auto *stream = outputs_[output].stream;

// We've documented that an output stream's ordinals ignore skips in its input
// streams, so we do not need to remember the input's ordinals pre-skip and increase
// our output's ordinals commensurately post-skip.

// If we skipped from the start we may not have seen the initial headers:
// use the input's cached copies.
if (stream.version_ == 0) {
stream.version_ = input.reader->get_version();
stream.last_timestamp_ = input.reader->get_last_timestamp();
stream.first_timestamp_ = input.reader->get_first_timestamp();
stream.filetype_ = input.reader->get_filetype();
stream.cache_line_size_ = input.reader->get_cache_line_size();
stream.chunk_instr_count_ = input.reader->get_chunk_instr_count();
stream.page_size_ = input.reader->get_page_size();
if (stream->version_ == 0) {
stream->version_ = input.reader->get_version();
stream->last_timestamp_ = input.reader->get_last_timestamp();
stream->first_timestamp_ = input.reader->get_first_timestamp();
stream->filetype_ = input.reader->get_filetype();
stream->cache_line_size_ = input.reader->get_cache_line_size();
stream->chunk_instr_count_ = input.reader->get_chunk_instr_count();
stream->page_size_ = input.reader->get_page_size();
}
// We let the user know we've skipped. There's no discontinuity for the
// first one so we do not insert a marker there (if we do want to insert one,
Expand Down Expand Up @@ -1837,7 +1848,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::set_cur_input(output_ordinal_t output,
std::lock_guard<std::mutex> lock(*inputs_[input].lock);

if (!switch_sequence_.empty() &&
outputs_[output].stream.get_instruction_ordinal() > 0) {
outputs_[output].stream->get_instruction_ordinal() > 0) {
sched_type_t::switch_type_t switch_type = SWITCH_INVALID;
if (prev_workload != inputs_[input].workload)
switch_type = SWITCH_PROCESS;
Expand Down
26 changes: 22 additions & 4 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,14 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
std::unique_ptr<ReaderType> kernel_switch_reader;
/** The end reader for #kernel_switch_reader. */
std::unique_ptr<ReaderType> kernel_switch_reader_end;
/**
* If true, enables a mode where all outputs are serialized into one global outer
* layer output. The single global output stream alternates in round-robin
* lockstep among each core output. The core outputs operate just like they
* would with no serialization, other than timing differences relative to other
* core outputs.
*/
bool single_lockstep_output = false;
};

/**
Expand Down Expand Up @@ -628,9 +636,10 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
class stream_t : public memtrace_stream_t {
public:
stream_t(scheduler_tmpl_t<RecordType, ReaderType> *scheduler, int ordinal,
int verbosity = 0)
int verbosity = 0, int max_ordinal = -1)
: scheduler_(scheduler)
, ordinal_(ordinal)
, max_ordinal_(max_ordinal)
, verbosity_(verbosity)
{
}
Expand Down Expand Up @@ -933,6 +942,9 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
protected:
scheduler_tmpl_t<RecordType, ReaderType> *scheduler_ = nullptr;
int ordinal_ = -1;
// If max_ordinal_ >= 0, ordinal_ is incremented modulo max_ordinal_ at the start
// of every next_record() invocation.
int max_ordinal_ = -1;
int verbosity_ = 0;
uint64_t cur_ref_count_ = 0;
uint64_t cur_instr_count_ = 0;
Expand Down Expand Up @@ -971,7 +983,7 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
{
if (ordinal < 0 || ordinal >= static_cast<output_ordinal_t>(outputs_.size()))
return nullptr;
return &outputs_[ordinal].stream;
return outputs_[ordinal].stream;
}

/** Returns the number of input streams. */
Expand Down Expand Up @@ -1160,12 +1172,16 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
output_ordinal_t ordinal,
typename spec_type_t::speculator_flags_t speculator_flags,
RecordType last_record_init, int verbosity = 0)
: stream(scheduler, ordinal, verbosity)
: self_stream(scheduler, ordinal, verbosity)
, stream(&self_stream)
, speculator(speculator_flags, verbosity)
, last_record(last_record_init)
{
}
stream_t stream;
stream_t self_stream;
// Normally stream points to &self_stream, but for single_lockstep_output
// it points to a global stream shared among all outputs.
stream_t *stream;
// This is an index into the inputs_ vector so -1 is an invalid value.
// This is set to >=0 for all non-empty outputs during init().
input_ordinal_t cur_input = INVALID_INPUT_ORDINAL;
Expand Down Expand Up @@ -1510,6 +1526,8 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
};
std::unordered_map<switch_type_t, std::vector<RecordType>, switch_type_hash_t>
switch_sequence_;
// For single_lockstep_output.
std::unique_ptr<stream_t> global_stream_;
};

/** See #dynamorio::drmemtrace::scheduler_tmpl_t. */
Expand Down
17 changes: 10 additions & 7 deletions clients/drcachesim/simulator/cache_simulator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,16 @@ cache_simulator_t::process_memref(const memref_t &memref)
}

int core;
if (memref.data.tid == last_thread_)
core = last_core_;
else {
if (shard_type_ == SHARD_BY_THREAD) {
if (memref.data.tid == last_thread_)
core = last_core_;
else {
core = core_for_thread(memref.data.tid);
last_thread_ = memref.data.tid;
last_core_ = core;
}
} else
core = core_for_thread(memref.data.tid);
last_thread_ = memref.data.tid;
last_core_ = core;
}

// To support swapping to physical addresses without modifying the passed-in
// memref (which is also passed to other tools run at the same time) we use
Expand Down Expand Up @@ -593,7 +596,7 @@ cache_simulator_t::print_results()
// Print core and associated L1 cache stats first.
for (unsigned int i = 0; i < knobs_.num_cores; i++) {
print_core(i);
if (thread_ever_counts_[i] > 0) {
if (shard_type_ == SHARD_BY_CORE || thread_ever_counts_[i] > 0) {
if (l1_icaches_[i] != l1_dcaches_[i]) {
std::cerr << " " << l1_icaches_[i]->get_name() << " ("
<< l1_icaches_[i]->get_description() << ") stats:" << std::endl;
Expand Down
65 changes: 58 additions & 7 deletions clients/drcachesim/simulator/simulator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ simulator_t::init_knobs(unsigned int num_cores, uint64_t skip_refs, uint64_t war
knob_verbose_ = verbose;
last_thread_ = 0;
last_core_ = 0;
cpu_counts_.resize(knob_num_cores_, 0);
thread_counts_.resize(knob_num_cores_, 0);
thread_ever_counts_.resize(knob_num_cores_, 0);
if (shard_type_ == SHARD_BY_THREAD) {
cpu_counts_.resize(knob_num_cores_, 0);
thread_counts_.resize(knob_num_cores_, 0);
thread_ever_counts_.resize(knob_num_cores_, 0);
}

if (knob_warmup_refs_ > 0 && (knob_warmup_fraction_ > 0.0)) {
ERRMSG("Usage error: Either warmup_refs OR warmup_fraction can be set");
Expand All @@ -87,13 +89,32 @@ simulator_t::init_knobs(unsigned int num_cores, uint64_t skip_refs, uint64_t war
}
}

std::string
simulator_t::initialize_stream(memtrace_stream_t *serial_stream)
{
serial_stream_ = serial_stream;
return "";
}

std::string
simulator_t::initialize_shard_type(shard_type_t shard_type)
{
shard_type_ = shard_type;
if (shard_type_ == SHARD_BY_CORE && knob_cpu_scheduling_) {
return "Usage error: -cpu_scheduling not supported with -core_serial; use "
"-cpu_schedule_file with -core_serial instead";
}
return "";
}

bool
simulator_t::process_memref(const memref_t &memref)
{
if (memref.marker.type != TRACE_TYPE_MARKER)
return true;
if (memref.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID && knob_cpu_scheduling_) {
int cpu = (int)(intptr_t)memref.marker.marker_value;
assert(shard_type_ == SHARD_BY_THREAD);
int64_t cpu = static_cast<int64_t>(memref.marker.marker_value);
if (cpu < 0)
return true;
int min_core;
Expand Down Expand Up @@ -218,6 +239,31 @@ simulator_t::find_emptiest_core(std::vector<int> &counts) const
int
simulator_t::core_for_thread(memref_tid_t tid)
{
if (shard_type_ == SHARD_BY_CORE) {
int64_t cpu = serial_stream_->get_output_cpuid();
// While the scheduler uses a 0-based ordinal for all but replaying as-traced,
// to handle as-traced (and because the docs for get_output_cpuid() do not
// guarantee 0-based), we map to a 0-based index just by incrementing an index
// as we discover each cpu.
// XXX: Should we add a new stream API for get_output_ordinal()? That would
// be a more faithful mapping than our dynamic discovery here -- although the
// lockstep ordering by the scheduler should have our ordinals in order.
if (cpu == last_cpu_)
return last_core_;
int core;
auto exists = cpu2core_.find(cpu);
if (exists == cpu2core_.end()) {
core = static_cast<int>(cpu2core_.size());
cpu2core_[cpu] = core;
if (knob_verbose_ >= 1) {
std::cerr << "new cpu " << cpu << " => core " << core << "\n";
}
} else
core = exists->second;
last_cpu_ = cpu;
last_core_ = core;
return core;
}
auto exists = thread2core_.find(tid);
if (exists != thread2core_.end())
return exists->second;
Expand All @@ -242,6 +288,8 @@ simulator_t::core_for_thread(memref_tid_t tid)
void
simulator_t::handle_thread_exit(memref_tid_t tid)
{
if (shard_type_ == SHARD_BY_CORE)
return;
std::unordered_map<memref_tid_t, int>::iterator exists = thread2core_.find(tid);
assert(exists != thread2core_.end());
assert(thread_counts_[exists->second] > 0);
Expand All @@ -256,17 +304,20 @@ simulator_t::handle_thread_exit(memref_tid_t tid)
void
simulator_t::print_core(int core) const
{
if (!knob_cpu_scheduling_) {
if (!knob_cpu_scheduling_ && shard_type_ == SHARD_BY_THREAD) {
std::cerr << "Core #" << core << " (" << thread_ever_counts_[core]
<< " thread(s))" << std::endl;
} else {
std::cerr << "Core #" << core;
if (cpu_counts_[core] == 0) {
if (shard_type_ == SHARD_BY_THREAD && cpu_counts_[core] == 0) {
// We keep the "(s)" mainly to simplify test templates.
std::cerr << " (0 traced CPU(s))" << std::endl;
return;
}
std::cerr << " (" << cpu_counts_[core] << " traced CPU(s): ";
std::cerr << " (";
if (shard_type_ == SHARD_BY_THREAD) // Always 1:1 for SHARD_BY_CORE.
std::cerr << cpu_counts_[core] << " ";
std::cerr << "traced CPU(s): ";
bool need_comma = false;
for (auto iter = cpu2core_.begin(); iter != cpu2core_.end(); ++iter) {
if (iter->second == core) {
Expand Down
Loading

0 comments on commit d2f47f3

Please sign in to comment.