Skip to content

Commit

Permalink
Merge pull request #302 from imzhenyu/layer2-api-change
Browse files Browse the repository at this point in the history
Layer 1 api change due to layer 2 requirement
  • Loading branch information
imzhenyu committed Feb 22, 2016
2 parents 2b9de2b + 8e7227c commit 7c0fb83
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 44 deletions.
13 changes: 7 additions & 6 deletions doc/Doxyfile
Original file line number Diff line number Diff line change
Expand Up @@ -783,13 +783,11 @@ WARN_LOGFILE =

INPUT = intro.h \
install.h \
dev.h \
examples.layer1.h \
examples.layer2.h \
tools.h \
dev.h \
eon.overview.h \
tools.h \
examples.complexity.h \
extensions.h \
examples.network.h \
../include/dsn/c/app_model.h \
../include/dsn/service_api_c.h \
../include/dsn/c/api_task.h \
Expand All @@ -813,7 +811,10 @@ INPUT = intro.h \
../include/dsn/tool/fastrun.h \
../include/dsn/tool/nativerun.h \
../include/dsn/tool/simulator.h \
../include/dsn/tool_api.h
../include/dsn/tool_api.h \
examples.layer1.h \
examples.layer2.h \
examples.network.h


# This tag can be used to specify the character encoding of the source files
Expand Down
1 change: 0 additions & 1 deletion doc/dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
Scalability, reliability, and availability (e.g., storage, micro-service)
@{
\defgroup l2-model Overview
\defgroup l2-ref API Reference
@}
Expand Down
28 changes: 28 additions & 0 deletions doc/eon.overview.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*!
@defgroup l2-model Overview
@ingroup dev-layer2
EON (rDSN Layer 2) scales out a local component (either legacy or from rDSN layer 1), and makes it
reliable automatically, to turn it into a real cloud service. We therefore call the application model in EON service model.
### Service Properties
In EON, we consider the following properties which largely decides the needed runtime support.
#### Stateless or stateful.
A service is stateless when the result for a service request remains the same when a service is just started bases on zero state, or running for a long time. A service is otherwise stateful, meaning that the accumulated state in the services needs to be carefully maintained to ensure correctness of the service requests. For example, the state must not be lost.
#### Partitioned or not.
When the state for a service is too large to be fit into a single machine's capacity (e.g., memory), developers usually partition the service according to certain rules (typically range or hash partitioning), to scale out the service.
#### Replicated or not.
When a service is stateful, replication is required to ensure high availability and reliability by replicating the state onto multiple copies. Meanwhile, multiple copies usually increase the read throughput of the system, not matter a service (or a service partition) is stateful or stateless. Replication has significant impact on how the read or write service calls are handled underneath. For example, a write service call (e.g., Put) may be propagated to all copies to ensure consistency, and a read service call (e.g., Get) may be issued to multiple copies for lower latency.
#### Service Model and Runtime Support Frameworks
According to the above service properties, EON separates the services into the following kinds, and provides runtime frameworks to turn layer 1 applications or legacy local libraries into a real service.
*/
39 changes: 27 additions & 12 deletions include/dsn/c/api_layer1.h
Original file line number Diff line number Diff line change
Expand Up @@ -493,16 +493,20 @@ rpc message read/write
\param rpc_code task code for this request
\param timeout_milliseconds timeout for the RPC call, 0 for default value as
configued in config files for the task code
\param hash if the task code is bound to a partitioned thread pool,
a hash value is needed to specify which thread in the pool should handle the request
\param request_hash if the task code is bound to a partitioned thread pool,
a thread hash is needed to specify which thread in the pool should handle the request
\param partition_hash if the target service is partitioned,
a partition hash is needed to specify which partition/shard should handle the request
\return RPC message handle
*/

extern DSN_API dsn_message_t dsn_msg_create_request(
dsn_task_code_t rpc_code,
int timeout_milliseconds DEFAULT(0),
int hash DEFAULT(0)
int request_hash DEFAULT(0),
uint64_t partition_hash DEFAULT(0)
);

/*! create a RPC response message correspondent to the given request message */
Expand All @@ -517,21 +521,32 @@ extern DSN_API void dsn_msg_add_ref(dsn_message_t msg);
/*! release reference to the message, paired with /ref dsn_msg_add_ref */
extern DSN_API void dsn_msg_release_ref(dsn_message_t msg);

/*! type of the parameter in \ref dsn_msg_context_t */
typedef enum dsn_msg_parameter_type_t
{
MSG_PARAM_NONE = 0, ///< nothing
MSG_PARAM_PARTITION_HASH = 1 ///< partition hash

} dsn_msg_parameter_type_t;

/*! RPC message context */
typedef union dsn_msg_context_t
{
struct {
uint64_t is_request : 1; ///< whether the RPC message is a request or response
uint64_t is_forwarded : 1; ///< whether the msg is forwarded or not
uint64_t write_replication : 1; ///< whether it is a write request to a replicated service
uint64_t read_replication : 1; ///< whether it is a read request to a replicated service
uint64_t read_semantic : 2; ///< see \ref read_semantic
uint64_t unused : 8;
uint64_t parameter : 50; ///< parameter for the flags, e.g., snapshort decree for replication read
uint64_t is_request : 1; ///< whether the RPC message is a request or response
uint64_t is_forwarded : 1; ///< whether the msg is forwarded or not
uint64_t is_replication_needed: 1; ///< whether state replication is needed for this request
uint64_t unused : 8; ///< not used yet
uint64_t parameter_type : 3; ///< type of the parameter next, see \ref dsn_msg_parameter_type_t
uint64_t parameter : 50; ///< piggybacked parameter for specific flags above
} u;
uint64_t context; ///< above flag specific information
uint64_t context; ///< msg_context is of sizeof(uint64_t)
} dsn_msg_context_t;

# define DSN_VNID_BUILD(app_id, par_idx) (((uint64_t)(app_id) << 32) | (uint64_t)(par_idx))
# define DSN_VNID_APP_ID(vnid) ((int)(vnid >> 32))
# define DSN_VNID_PARTITION_INDEX(vnid) ((int)(vnid & 0x00000000FFFFFFFFULL))

# define DSN_MSGM_TIMEOUT (0x1 << 0) ///< msg timeout is to be set/get
# define DSN_MSGM_HASH (0x1 << 1) ///< thread hash is to be set/get
# define DSN_MSGM_VNID (0x1 << 2) ///< virtual node id (vnid) is to be set/get
Expand All @@ -541,7 +556,7 @@ typedef union dsn_msg_context_t
typedef struct dsn_msg_options_t
{
int timeout_ms; ///< RPC timeout in milliseconds
int thread_hash; ///< thread hash on RPC server
int request_hash; ///< thread hash on RPC server
uint64_t vnid; ///< virtual node id, 0 for none
dsn_msg_context_t context; ///< see \ref dsn_msg_context_t
} dsn_msg_options_t;
Expand Down
20 changes: 12 additions & 8 deletions include/dsn/cpp/clientlet.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,11 @@ namespace dsn
TCallback&& callback,
int request_hash = 0,
std::chrono::milliseconds timeout = std::chrono::milliseconds(0),
int reply_hash = 0)
int reply_hash = 0,
uint64_t partition_hash = 0
)
{
dsn_message_t msg = dsn_msg_create_request(code, static_cast<int>(timeout.count()), request_hash);
dsn_message_t msg = dsn_msg_create_request(code, static_cast<int>(timeout.count()), request_hash, partition_hash);
::marshall(msg, std::forward<TRequest>(req));
return call(server, msg, owner, std::forward<TCallback>(callback), reply_hash);
}
Expand All @@ -330,9 +332,11 @@ namespace dsn
dsn_task_code_t code,
TRequest&& req,
int request_hash = 0,
std::chrono::milliseconds timeout = std::chrono::milliseconds(0))
std::chrono::milliseconds timeout = std::chrono::milliseconds(0),
uint64_t partition_hash = 0
)
{
dsn_message_t msg = dsn_msg_create_request(code, static_cast<int>(timeout.count()), request_hash);
dsn_message_t msg = dsn_msg_create_request(code, static_cast<int>(timeout.count()), request_hash, partition_hash);
::marshall(msg, std::forward<TRequest>(req));
return rpc_message_helper(msg);
}
Expand All @@ -354,15 +358,15 @@ namespace dsn
::dsn::rpc_address server,
dsn_task_code_t code,
const TRequest& req,
int hash = 0
int request_hash = 0,
uint64_t partition_hash = 0
)
{
dsn_message_t msg = dsn_msg_create_request(code, 0, hash);
dsn_message_t msg = dsn_msg_create_request(code, 0, request_hash, partition_hash);
::marshall(msg, req);
dsn_rpc_call_one_way(server.c_addr(), msg);
}



template<typename TResponse>
std::pair< ::dsn::error_code, TResponse> wait_and_unwrap(task_ptr task)
{
Expand Down
4 changes: 2 additions & 2 deletions include/dsn/cpp/rpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ namespace dsn
}

// for request
rpc_write_stream(task_code code, int timeout_ms = 0, int hash = 0)
: safe_handle<dsn_msg_release_ref>(dsn_msg_create_request(code, timeout_ms, hash), false)
rpc_write_stream(task_code code, int timeout_ms = 0, int request_hash = 0, uint64_t partition_hash = 0)
: safe_handle<dsn_msg_release_ref>(dsn_msg_create_request(code, timeout_ms, request_hash, partition_hash), false)
{
_last_write_next_committed = true;
_last_write_next_total_size = 0;
Expand Down
7 changes: 6 additions & 1 deletion include/dsn/internal/rpc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,12 @@ namespace dsn
// routines for create messages
//
static message_ex* create_receive_message(const blob& data);
static message_ex* create_request(dsn_task_code_t rpc_code, int timeout_milliseconds = 0, int hash = 0);
static message_ex* create_request(
dsn_task_code_t rpc_code,
int timeout_milliseconds = 0,
int request_hash = 0,
uint64_t partition_hash = 0
);
message_ex* create_response();
message_ex* copy();
message_ex* copy_and_prepare_send();
Expand Down
8 changes: 8 additions & 0 deletions include/dsn/internal/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ class task_spec : public extensible_object<task_spec, 4>
throttling_mode_t rpc_request_throttling_mode; //
std::vector<int> rpc_request_delays_milliseconds; // see exp_delay for delaying recving
bool rpc_request_dropped_before_execution_when_timeout;

// layer 2 configurations
bool rpc_request_is_replicated_write_operation; // need stateful replication

// ]

task_rejection_handler rejection_handler;
Expand Down Expand Up @@ -267,6 +271,10 @@ CONFIG_BEGIN(task_spec)
CONFIG_FLD_ENUM(throttling_mode_t, rpc_request_throttling_mode, TM_NONE, TM_INVALID, false, "throttling mode for rpc requets: TM_NONE, TM_REJECT, TM_DELAY when queue length > pool.queue_length_throttling_threshold")
CONFIG_FLD_INT_LIST(rpc_request_delays_milliseconds, "how many milliseconds to delay recving rpc session for when queue length ~= [1.0, 1.2, 1.4, 1.6, 1.8, >=2.0] x pool.queue_length_throttling_threshold, e.g., 0, 0, 1, 2, 5, 10")
CONFIG_FLD(bool, bool, rpc_request_dropped_before_execution_when_timeout, false, "whether to drop a request right before execution when its queueing time is already greater than its timeout value")

// layer 2 configurations
CONFIG_FLD(bool, bool, rpc_request_is_replicated_write_operation, false, "whether this is a replicated and write request")

CONFIG_END

struct threadpool_spec
Expand Down
33 changes: 22 additions & 11 deletions src/core/core/rpc_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ using namespace dsn::utils;
# define __TITLE__ "message"
#define CRC_INVALID 0xdead0c2c

DSN_API dsn_message_t dsn_msg_create_request(dsn_task_code_t rpc_code, int timeout_milliseconds, int hash)
DSN_API dsn_message_t dsn_msg_create_request(
dsn_task_code_t rpc_code,
int timeout_milliseconds,
int request_hash,
uint64_t partition_hash
)
{
auto msg = ::dsn::message_ex::create_request(rpc_code, timeout_milliseconds, hash);
return msg;
return ::dsn::message_ex::create_request(rpc_code, timeout_milliseconds, request_hash, partition_hash);
}

DSN_API dsn_message_t dsn_msg_copy(dsn_message_t msg)
Expand Down Expand Up @@ -134,7 +138,7 @@ DSN_API void dsn_msg_set_options(

if (mask & DSN_MSGM_HASH)
{
hdr->client.hash = opts->thread_hash;
hdr->client.hash = opts->request_hash;
}

if (mask & DSN_MSGM_VNID)
Expand All @@ -155,7 +159,7 @@ DSN_API void dsn_msg_get_options(
{
auto hdr = ((::dsn::message_ex*)msg)->header;
opts->context = hdr->context;
opts->thread_hash = hdr->client.hash;
opts->request_hash = hdr->client.hash;
opts->timeout_ms = hdr->client.timeout_ms;
opts->vnid = hdr->vnid;
}
Expand Down Expand Up @@ -389,7 +393,7 @@ message_ex* message_ex::copy_and_prepare_send()
return copy;
}

message_ex* message_ex::create_request(dsn_task_code_t rpc_code, int timeout_milliseconds, int hash)
message_ex* message_ex::create_request(dsn_task_code_t rpc_code, int timeout_milliseconds, int request_hash, uint64_t partition_hash)
{
message_ex* msg = new message_ex();
msg->_is_read = false;
Expand All @@ -398,10 +402,9 @@ message_ex* message_ex::create_request(dsn_task_code_t rpc_code, int timeout_mil
// init header
auto& hdr = *msg->header;
memset(&hdr, 0, sizeof(hdr));
hdr.hdr_crc32 = hdr.body_crc32 = CRC_INVALID;

if (0 != hash)
hdr.client.hash = hash;
hdr.hdr_crc32 = hdr.body_crc32 = CRC_INVALID;

hdr.client.hash = request_hash;

if (0 == timeout_milliseconds)
{
Expand All @@ -412,12 +415,20 @@ message_ex* message_ex::create_request(dsn_task_code_t rpc_code, int timeout_mil
hdr.client.timeout_ms = timeout_milliseconds;
}

strncpy(hdr.rpc_name, dsn_task_code_to_string(rpc_code), sizeof(hdr.rpc_name));
task_spec* sp = task_spec::get(rpc_code);
strncpy(hdr.rpc_name, sp->name.c_str(), sizeof(hdr.rpc_name));
hdr.rpc_name_fast.local_rpc_id = (uint32_t)rpc_code;
hdr.rpc_name_fast.local_hash = s_local_hash;

hdr.id = new_id();

hdr.context.u.is_request = true;
if (0 != partition_hash)
{
hdr.context.u.parameter_type = MSG_PARAM_PARTITION_HASH;
hdr.context.u.parameter = partition_hash;
}
hdr.context.u.is_replication_needed = sp->rpc_request_is_replicated_write_operation;

msg->local_rpc_code = (uint32_t)rpc_code;
return msg;
Expand Down
2 changes: 1 addition & 1 deletion src/core/tests/rpc_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ TEST(core, message_ex)

dsn_msg_get_options(request, &opts);
ASSERT_EQ(100, opts.timeout_ms);
ASSERT_EQ(1, opts.thread_hash);
ASSERT_EQ(1, opts.request_hash);
ASSERT_EQ(333u, opts.vnid);
ASSERT_EQ(444u, opts.context.context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ void replication_app_client_base::call_with_address(dsn::rpc_address addr, reque

dsn_msg_options_t opts;
opts.timeout_ms = request->timeout_ms;
opts.thread_hash = gpid_to_hash(request->read_header.gpid);
opts.request_hash = gpid_to_hash(request->read_header.gpid);
opts.vnid = *(uint64_t*)(&request->read_header.gpid);
dsn_msg_set_options(request->request, &opts, DSN_MSGM_HASH | DSN_MSGM_TIMEOUT); // TODO: not supported yet DSN_MSGM_VNID);
}
Expand All @@ -336,7 +336,7 @@ void replication_app_client_base::call_with_address(dsn::rpc_address addr, reque

dsn_msg_options_t opts;
opts.timeout_ms = request->timeout_ms;
opts.thread_hash = gpid_to_hash(request->write_header.gpid);
opts.request_hash = gpid_to_hash(request->write_header.gpid);
opts.vnid = *(uint64_t*)(&request->write_header.gpid);

dsn_msg_set_options(request->request, &opts, DSN_MSGM_HASH | DSN_MSGM_TIMEOUT); // TODO: not supported yet DSN_MSGM_VNID | DSN_MSGM_CONTEXT);
Expand Down

0 comments on commit 7c0fb83

Please sign in to comment.