Skip to content

Commit

Permalink
improve mutation_data write_to/read_from, write task_code as string
Browse files Browse the repository at this point in the history
  • Loading branch information
qinzuoyan committed Jul 29, 2016
1 parent ebb7acc commit 4ed36f1
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 63 deletions.
2 changes: 2 additions & 0 deletions include/dsn/cpp/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ namespace dsn
binary_writer(blob& buffer);
virtual ~binary_writer();

virtual void flush();

template<typename T> void write_pod(const T& val);
template<typename T> void write(const T& val) { dassert(false, "write of this type is not implemented"); }
void write(const int8_t& val) { write_pod(val); }
Expand Down
8 changes: 7 additions & 1 deletion include/dsn/cpp/rpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,14 @@ namespace dsn
}
}

~rpc_write_stream()
virtual ~rpc_write_stream()
{
flush();
}

virtual void flush() override
{
binary_writer::flush();
commit_buffer();
}

Expand Down
1 change: 1 addition & 0 deletions include/dsn/tool-api/rpc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ namespace dsn
//
DSN_API void write_next(void** ptr, size_t* size, size_t min_size);
DSN_API void write_commit(size_t size);
DSN_API void write_append(const blob& data);
DSN_API bool read_next(void** ptr, size_t* size);
DSN_API void read_commit(size_t size);
size_t body_size() { return (size_t)header->body_length; }
Expand Down
16 changes: 16 additions & 0 deletions src/core/src/rpc_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,22 @@ void message_ex::write_commit(size_t size)
this->header->body_length += (int)size;
}

void message_ex::write_append(const blob& data)
{
// printf("%p %s\n", this, __FUNCTION__);
dassert(!this->_is_read && this->_rw_committed, "there are pending msg write not committed"
", please invoke dsn_msg_write_next and dsn_msg_write_commit in pairs");

int size = data.length();
if (size > 0)
{
this->_rw_index++;
this->_rw_offset += size;
this->buffers.push_back(data);
this->header->body_length += size;
}
}

bool message_ex::read_next(void** ptr, size_t* size)
{
// printf("%p %s %d\n", this, __FUNCTION__, utils::get_current_tid());
Expand Down
6 changes: 6 additions & 0 deletions src/dev/cpp/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,11 @@ namespace dsn
{
}

void binary_writer::flush()
{
commit();
}

void binary_writer::create_buffer(size_t size)
{
commit();
Expand Down Expand Up @@ -387,6 +392,7 @@ namespace dsn
blob binary_writer::get_buffer()
{
commit();

if (_buffers.size() == 1)
{
return _buffers[0];
Expand Down
87 changes: 44 additions & 43 deletions src/dist/replication/lib/mutation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
# include "mutation.h"
# include "mutation_log.h"
# include "replica.h"
# include <dsn/tool-api/rpc_message.h>

namespace dsn { namespace replication {

Expand Down Expand Up @@ -136,40 +137,7 @@ void mutation::add_client_request(task_code code, dsn_message_t request)
dassert(client_requests.size() == data.updates.size(), "size must be equal");
}

void mutation::write_to(binary_writer& writer) const
{
marshall(writer, data, DSF_THRIFT_BINARY);
}

/*static*/ mutation_ptr mutation::read_from(binary_reader& reader, dsn_message_t from)
{
mutation_ptr mu(new mutation());
unmarshall(reader, mu->data, DSF_THRIFT_BINARY);

for (const mutation_update& update : mu->data.updates)
{
dassert(update.code != TASK_CODE_INVALID, "invalid mutation task code");
}

mu->client_requests.resize(mu->data.updates.size());

if (nullptr != from)
{
mu->_prepare_request = from;
dsn_msg_add_ref(from); // released on dctor
}

snprintf_p(mu->_name, sizeof(mu->_name),
"%" PRId32 ".%" PRId32 ".%" PRId64 ".%" PRId64,
mu->data.header.pid.get_app_id(),
mu->data.header.pid.get_partition_index(),
mu->data.header.ballot,
mu->data.header.decree);

return mu;
}

void mutation::write_to_log_file(std::function<void(const blob&)> inserter) const
void mutation::write_to(std::function<void(const blob&)> inserter) const
{
{
binary_writer temp_writer;
Expand All @@ -178,8 +146,16 @@ void mutation::write_to_log_file(std::function<void(const blob&)> inserter) cons

for (const mutation_update& update : data.updates)
{
temp_writer.write_pod(static_cast<int>(update.code));
// write task_code as string to make it cross-process compatible.
// avoid memory copy, equal to writer.write(std::string)
const char* cstr = update.code.to_string();
int len = static_cast<int>(strlen(cstr));
temp_writer.write_pod(len);
if (len > 0)
temp_writer.write(cstr, len);

temp_writer.write_pod(static_cast<int>(update.serialization_type));

temp_writer.write_pod(static_cast<int>(update.data.length()));
}

Expand All @@ -192,25 +168,46 @@ void mutation::write_to_log_file(std::function<void(const blob&)> inserter) cons
}
}

void mutation::write_to_log_file(binary_writer& writer) const
void mutation::write_to(binary_writer& writer, dsn_message_t to) const
{
writer.write_pod(data.header);
writer.write_pod(static_cast<int>(data.updates.size()));

for (const mutation_update& update : data.updates)
{
writer.write_pod(static_cast<int>(update.code));
// write task_code as string to make it cross-process compatible.
// avoid memory copy, equal to writer.write(std::string)
const char* cstr = update.code.to_string();
int len = static_cast<int>(strlen(cstr));
writer.write_pod(len);
if (len > 0)
writer.write(cstr, len);

writer.write_pod(static_cast<int>(update.serialization_type));

writer.write_pod(static_cast<int>(update.data.length()));
}

for (const mutation_update& update : data.updates)
if (nullptr != to)
{
writer.write(update.data.data(), update.data.length());
writer.flush();

message_ex* msg = (message_ex*)to;
for (const mutation_update& update : data.updates)
{
msg->write_append(update.data);
}
}
else
{
for (const mutation_update& update : data.updates)
{
writer.write(update.data.data(), update.data.length());
}
}
}

/*static*/ mutation_ptr mutation::read_from_log_file(binary_reader& reader, dsn_message_t from)
/*static*/ mutation_ptr mutation::read_from(binary_reader& reader, dsn_message_t from)
{
mutation_ptr mu(new mutation());
reader.read_pod(mu->data.header);
Expand All @@ -220,12 +217,16 @@ void mutation::write_to_log_file(binary_writer& writer) const
std::vector<int> lengths(size, 0);
for (int i = 0; i < size; ++i)
{
int code;
reader.read_pod(code);
mu->data.updates[i].code = ::dsn::task_code(code);
std::string name;
reader.read(name);
::dsn::task_code code(dsn_task_code_from_string(name.c_str(), TASK_CODE_INVALID));
dassert(code != TASK_CODE_INVALID, "invalid mutation task code: %s", name.c_str());
mu->data.updates[i].code = code;

int type;
reader.read_pod(type);
mu->data.updates[i].serialization_type = type;

reader.read_pod(lengths[i]);
}
for (int i = 0; i < size; ++i)
Expand Down
16 changes: 5 additions & 11 deletions src/dist/replication/lib/mutation.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,15 @@ class mutation : public ref_counter

// >= 1 MB
bool is_full() const { return _appro_data_bytes >= 1024 * 1024; }

// general reader & writer
void write_to(binary_writer& writer) const;
static mutation_ptr read_from(binary_reader& reader, dsn_message_t from);

// write-to/read-from mutation log file, for better performance
// write & write mutation data, optimized for better performance
//
// TODO(qinzuoyan): the optimization is to marshall code as int but not string,
// but this may cause problem, because log may be used by different program and
// the code map may change:
// task_code should be marshalled as string for cross-process compatiblity, because:
// - the private log may be transfered to other node with different program
// - the private/shared log may be replayed by different program when server restart
void write_to_log_file(std::function<void(const blob&)> inserter) const;
void write_to_log_file(binary_writer& writer) const;
static mutation_ptr read_from_log_file(binary_reader& reader, dsn_message_t from);
void write_to(std::function<void(const blob&)> inserter) const;
void write_to(binary_writer& writer, dsn_message_t to) const;
static mutation_ptr read_from(binary_reader& reader, dsn_message_t from);

// data
mutation_data data;
Expand Down
10 changes: 5 additions & 5 deletions src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ ::dsn::task_ptr mutation_log_shared::append(mutation_ptr& mu,
// write mutation to pending buffer
mu->data.header.log_offset = _pending_write_start_offset + _pending_write->size();
//printf("%lld: %lld\n", d, mu->data.header.log_offset);
mu->write_to_log_file([this](blob bb)
mu->write_to([this](blob bb)
{
_pending_write->add(bb);
});
Expand Down Expand Up @@ -246,7 +246,7 @@ ::dsn::task_ptr mutation_log_private::append(mutation_ptr& mu,
// write mutation to pending buffer
mu->data.header.log_offset = _pending_write_start_offset + _pending_write->size();
//printf("%lld: %lld\n", d, mu->data.header.log_offset);
mu->write_to_log_file([this](blob bb)
mu->write_to([this](blob bb)
{
_pending_write->add(bb);
});
Expand Down Expand Up @@ -297,13 +297,13 @@ bool mutation_log_private::get_learn_state_in_memory(
{
for (auto& mu : *issued_mutations)
{
mu->write_to_log_file(writer);
mu->write_to(writer, nullptr);
}
}

for (auto& mu : pending_mutations)
{
mu->write_to_log_file(writer);
mu->write_to(writer, nullptr);
}

return r;
Expand Down Expand Up @@ -862,7 +862,7 @@ std::pair<log_file_ptr, int64_t> mutation_log::mark_new_offset(size_t size, bool
while (!reader->is_eof())
{
auto old_size = reader->get_remaining_size();
mutation_ptr mu = mutation::read_from_log_file(*reader, nullptr);
mutation_ptr mu = mutation::read_from(*reader, nullptr);
dassert(nullptr != mu, "");
mu->set_logged();

Expand Down
2 changes: 1 addition & 1 deletion src/dist/replication/lib/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ void replica::send_prepare_message(
rpc_write_stream writer(msg);
marshall(writer, get_gpid(), DSF_THRIFT_BINARY);
marshall(writer, rconfig, DSF_THRIFT_BINARY);
mu->write_to(writer);
mu->write_to(writer, msg);
}

mu->remote_tasks()[addr] = rpc::call(addr, msg,
Expand Down
4 changes: 2 additions & 2 deletions src/dist/replication/lib/replica_learn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ void replica::on_learn(dsn_message_t msg, const learn_request& request)
{
auto mu = _prepare_list->get_mutation_by_decree(d);
dassert(mu != nullptr, "");
mu->write_to(writer);
mu->write_to(writer, nullptr);
count++;
}
response.type = learn_type::LT_CACHE;
Expand Down Expand Up @@ -1056,7 +1056,7 @@ error_code replica::apply_learned_state_from_private_log(learn_state& state)
binary_reader reader(state.meta);
while (!reader.is_eof())
{
auto mu = mutation::read_from_log_file(reader, nullptr);
auto mu = mutation::read_from(reader, nullptr);
auto d = mu->data.header.decree;
if (d <= plist.last_committed_decree())
continue;
Expand Down

0 comments on commit 4ed36f1

Please sign in to comment.