From b306e578dcde968b2d485562b3c165e1b6222520 Mon Sep 17 00:00:00 2001 From: Zhenyu Guo Date: Fri, 19 Feb 2016 20:29:31 +0800 Subject: [PATCH 1/4] add eon overview --- doc/Doxyfile | 13 +++++++------ doc/dev.h | 1 - doc/eon.overview.h | 28 ++++++++++++++++++++++++++++ 3 files changed, 35 insertions(+), 7 deletions(-) create mode 100644 doc/eon.overview.h diff --git a/doc/Doxyfile b/doc/Doxyfile index 5cd43531..d6ede8b3 100644 --- a/doc/Doxyfile +++ b/doc/Doxyfile @@ -783,13 +783,11 @@ WARN_LOGFILE = INPUT = intro.h \ install.h \ - dev.h \ - examples.layer1.h \ - examples.layer2.h \ - tools.h \ + dev.h \ + eon.overview.h \ + tools.h \ examples.complexity.h \ extensions.h \ - examples.network.h \ ../include/dsn/c/app_model.h \ ../include/dsn/service_api_c.h \ ../include/dsn/c/api_task.h \ @@ -813,7 +811,10 @@ INPUT = intro.h \ ../include/dsn/tool/fastrun.h \ ../include/dsn/tool/nativerun.h \ ../include/dsn/tool/simulator.h \ - ../include/dsn/tool_api.h + ../include/dsn/tool_api.h \ + examples.layer1.h \ + examples.layer2.h \ + examples.network.h # This tag can be used to specify the character encoding of the source files diff --git a/doc/dev.h b/doc/dev.h index dc960932..c30312fe 100644 --- a/doc/dev.h +++ b/doc/dev.h @@ -13,7 +13,6 @@ Scalability, reliability, and availability (e.g., storage, micro-service) @{ - \defgroup l2-model Overview \defgroup l2-ref API Reference @} diff --git a/doc/eon.overview.h b/doc/eon.overview.h new file mode 100644 index 00000000..e57785f5 --- /dev/null +++ b/doc/eon.overview.h @@ -0,0 +1,28 @@ +/*! + @defgroup l2-model Overview + @ingroup dev-layer2 + + EON (rDSN Layer 2) scales out a local component (either legacy or from rDSN layer 1), and makes it + reliable automatically, to turn it into a real cloud service. We therefore call the application model in EON service model. + +### Service Properties + +In EON, we consider the following properties which largely decides the needed runtime support. + +#### Stateless or stateful. + +A service is stateless when the result for a service request remains the same when a service is just started bases on zero state, or running for a long time. A service is otherwise stateful, meaning that the accumulated state in the services needs to be carefully maintained to ensure correctness of the service requests. For example, the state must not be lost. + +#### Partitioned or not. + +When the state for a service is too large to be fit into a single machine's capacity (e.g., memory), developers usually partition the service according to certain rules (typically range or hash partitioning), to scale out the service. + +#### Replicated or not. + +When a service is stateful, replication is required to ensure high availability and reliability by replicating the state onto multiple copies. Meanwhile, multiple copies usually increase the read throughput of the system, not matter a service (or a service partition) is stateful or stateless. Replication has significant impact on how the read or write service calls are handled underneath. For example, a write service call (e.g., Put) may be propagated to all copies to ensure consistency, and a read service call (e.g., Get) may be issued to multiple copies for lower latency. + +#### Service Model and Runtime Support Frameworks + +According to the above service properties, EON separates the services into the following kinds, and provides runtime frameworks to turn layer 1 applications or legacy local libraries into a real service. + + */ \ No newline at end of file From 56888bcaac3003d68e78e39d40fa9b2886e09dbe Mon Sep 17 00:00:00 2001 From: Zhenyu Guo Date: Sun, 21 Feb 2016 23:34:08 +0800 Subject: [PATCH 2/4] add service API interface change due to layer 2 requirement --- include/dsn/c/api_layer1.h | 39 +++++++++++++++++++++--------- include/dsn/cpp/clientlet.h | 20 +++++++++------ include/dsn/cpp/rpc_stream.h | 4 +-- include/dsn/internal/rpc_message.h | 7 +++++- include/dsn/internal/task_spec.h | 8 ++++++ src/core/core/rpc_message.cpp | 33 ++++++++++++++++--------- 6 files changed, 77 insertions(+), 34 deletions(-) diff --git a/include/dsn/c/api_layer1.h b/include/dsn/c/api_layer1.h index c12ff02c..a3ac7b4b 100644 --- a/include/dsn/c/api_layer1.h +++ b/include/dsn/c/api_layer1.h @@ -493,8 +493,11 @@ rpc message read/write \param rpc_code task code for this request \param timeout_milliseconds timeout for the RPC call, 0 for default value as configued in config files for the task code - \param hash if the task code is bound to a partitioned thread pool, - a hash value is needed to specify which thread in the pool should handle the request + \param request_hash if the task code is bound to a partitioned thread pool, + a thread hash is needed to specify which thread in the pool should handle the request + + \param partition_hash if the target service is partitioned, + a partition hash is needed to specify which partition/shard should handle the request \return RPC message handle */ @@ -502,7 +505,8 @@ rpc message read/write extern DSN_API dsn_message_t dsn_msg_create_request( dsn_task_code_t rpc_code, int timeout_milliseconds DEFAULT(0), - int hash DEFAULT(0) + int request_hash DEFAULT(0), + uint64_t partition_hash DEFAULT(0) ); /*! create a RPC response message correspondent to the given request message */ @@ -517,21 +521,32 @@ extern DSN_API void dsn_msg_add_ref(dsn_message_t msg); /*! release reference to the message, paired with /ref dsn_msg_add_ref */ extern DSN_API void dsn_msg_release_ref(dsn_message_t msg); +/*! type of the parameter in \ref dsn_msg_context_t */ +typedef enum dsn_msg_parameter_type_t +{ + MSG_PARAM_NONE = 0, ///< nothing + MSG_PARAM_PARTITION_HASH = 1 ///< partition hash + +} dsn_msg_parameter_type_t; + /*! RPC message context */ typedef union dsn_msg_context_t { struct { - uint64_t is_request : 1; ///< whether the RPC message is a request or response - uint64_t is_forwarded : 1; ///< whether the msg is forwarded or not - uint64_t write_replication : 1; ///< whether it is a write request to a replicated service - uint64_t read_replication : 1; ///< whether it is a read request to a replicated service - uint64_t read_semantic : 2; ///< see \ref read_semantic - uint64_t unused : 8; - uint64_t parameter : 50; ///< parameter for the flags, e.g., snapshort decree for replication read + uint64_t is_request : 1; ///< whether the RPC message is a request or response + uint64_t is_forwarded : 1; ///< whether the msg is forwarded or not + uint64_t is_replication_needed: 1; ///< whether state replication is needed for this request + uint64_t unused : 8; ///< not used yet + uint64_t parameter_type : 3; ///< type of the parameter next, see \ref dsn_msg_parameter_type_t + uint64_t parameter : 50; ///< piggybacked parameter for specific flags above } u; - uint64_t context; ///< above flag specific information + uint64_t context; ///< msg_context is of sizeof(uint64_t) } dsn_msg_context_t; +# define DSN_VNID_BUILD(app_id, par_idx) (((uint64_t)(app_id) << 32) | (uint64_t)(par_idx)) +# define DSN_VNID_APP_ID(vnid) ((int)(vnid >> 32)) +# define DSN_VNID_PARTITION_INDEX(vnid) ((int)(vnid & 0x00000000FFFFFFFFULL)) + # define DSN_MSGM_TIMEOUT (0x1 << 0) ///< msg timeout is to be set/get # define DSN_MSGM_HASH (0x1 << 1) ///< thread hash is to be set/get # define DSN_MSGM_VNID (0x1 << 2) ///< virtual node id (vnid) is to be set/get @@ -541,7 +556,7 @@ typedef union dsn_msg_context_t typedef struct dsn_msg_options_t { int timeout_ms; ///< RPC timeout in milliseconds - int thread_hash; ///< thread hash on RPC server + int request_hash; ///< thread hash on RPC server uint64_t vnid; ///< virtual node id, 0 for none dsn_msg_context_t context; ///< see \ref dsn_msg_context_t } dsn_msg_options_t; diff --git a/include/dsn/cpp/clientlet.h b/include/dsn/cpp/clientlet.h index 173d3657..f0609b3f 100644 --- a/include/dsn/cpp/clientlet.h +++ b/include/dsn/cpp/clientlet.h @@ -301,9 +301,11 @@ namespace dsn TCallback&& callback, int request_hash = 0, std::chrono::milliseconds timeout = std::chrono::milliseconds(0), - int reply_hash = 0) + int reply_hash = 0, + uint64_t partition_hash = 0 + ) { - dsn_message_t msg = dsn_msg_create_request(code, static_cast(timeout.count()), request_hash); + dsn_message_t msg = dsn_msg_create_request(code, static_cast(timeout.count()), request_hash, partition_hash); ::marshall(msg, std::forward(req)); return call(server, msg, owner, std::forward(callback), reply_hash); } @@ -330,9 +332,11 @@ namespace dsn dsn_task_code_t code, TRequest&& req, int request_hash = 0, - std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) + std::chrono::milliseconds timeout = std::chrono::milliseconds(0), + uint64_t partition_hash = 0 + ) { - dsn_message_t msg = dsn_msg_create_request(code, static_cast(timeout.count()), request_hash); + dsn_message_t msg = dsn_msg_create_request(code, static_cast(timeout.count()), request_hash, partition_hash); ::marshall(msg, std::forward(req)); return rpc_message_helper(msg); } @@ -354,15 +358,15 @@ namespace dsn ::dsn::rpc_address server, dsn_task_code_t code, const TRequest& req, - int hash = 0 + int request_hash = 0, + uint64_t partition_hash = 0 ) { - dsn_message_t msg = dsn_msg_create_request(code, 0, hash); + dsn_message_t msg = dsn_msg_create_request(code, 0, request_hash, partition_hash); ::marshall(msg, req); dsn_rpc_call_one_way(server.c_addr(), msg); } - - + template std::pair< ::dsn::error_code, TResponse> wait_and_unwrap(task_ptr task) { diff --git a/include/dsn/cpp/rpc_stream.h b/include/dsn/cpp/rpc_stream.h index f2cf8a6b..65e09465 100644 --- a/include/dsn/cpp/rpc_stream.h +++ b/include/dsn/cpp/rpc_stream.h @@ -100,8 +100,8 @@ namespace dsn } // for request - rpc_write_stream(task_code code, int timeout_ms = 0, int hash = 0) - : safe_handle(dsn_msg_create_request(code, timeout_ms, hash), false) + rpc_write_stream(task_code code, int timeout_ms = 0, int request_hash = 0, uint64_t partition_hash = 0) + : safe_handle(dsn_msg_create_request(code, timeout_ms, request_hash, partition_hash), false) { _last_write_next_committed = true; _last_write_next_total_size = 0; diff --git a/include/dsn/internal/rpc_message.h b/include/dsn/internal/rpc_message.h index 0f6da63b..5cdf528e 100644 --- a/include/dsn/internal/rpc_message.h +++ b/include/dsn/internal/rpc_message.h @@ -133,7 +133,12 @@ namespace dsn // routines for create messages // static message_ex* create_receive_message(const blob& data); - static message_ex* create_request(dsn_task_code_t rpc_code, int timeout_milliseconds = 0, int hash = 0); + static message_ex* create_request( + dsn_task_code_t rpc_code, + int timeout_milliseconds = 0, + int request_hash = 0, + uint64_t partition_hash = 0 + ); message_ex* create_response(); message_ex* copy(); message_ex* copy_and_prepare_send(); diff --git a/include/dsn/internal/task_spec.h b/include/dsn/internal/task_spec.h index c11c1f9d..8128a4de 100644 --- a/include/dsn/internal/task_spec.h +++ b/include/dsn/internal/task_spec.h @@ -211,6 +211,10 @@ class task_spec : public extensible_object throttling_mode_t rpc_request_throttling_mode; // std::vector rpc_request_delays_milliseconds; // see exp_delay for delaying recving bool rpc_request_dropped_before_execution_when_timeout; + + // layer 2 configurations + bool rpc_request_is_replicated_write_operation; // need stateful replication + // ] task_rejection_handler rejection_handler; @@ -267,6 +271,10 @@ CONFIG_BEGIN(task_spec) CONFIG_FLD_ENUM(throttling_mode_t, rpc_request_throttling_mode, TM_NONE, TM_INVALID, false, "throttling mode for rpc requets: TM_NONE, TM_REJECT, TM_DELAY when queue length > pool.queue_length_throttling_threshold") CONFIG_FLD_INT_LIST(rpc_request_delays_milliseconds, "how many milliseconds to delay recving rpc session for when queue length ~= [1.0, 1.2, 1.4, 1.6, 1.8, >=2.0] x pool.queue_length_throttling_threshold, e.g., 0, 0, 1, 2, 5, 10") CONFIG_FLD(bool, bool, rpc_request_dropped_before_execution_when_timeout, false, "whether to drop a request right before execution when its queueing time is already greater than its timeout value") + + // layer 2 configurations + CONFIG_FLD(bool, bool, rpc_request_is_replicated_write_operation, false, "whether this is a replicated and write request") + CONFIG_END struct threadpool_spec diff --git a/src/core/core/rpc_message.cpp b/src/core/core/rpc_message.cpp index b61dfa46..64a837a3 100644 --- a/src/core/core/rpc_message.cpp +++ b/src/core/core/rpc_message.cpp @@ -47,10 +47,14 @@ using namespace dsn::utils; # define __TITLE__ "message" #define CRC_INVALID 0xdead0c2c -DSN_API dsn_message_t dsn_msg_create_request(dsn_task_code_t rpc_code, int timeout_milliseconds, int hash) +DSN_API dsn_message_t dsn_msg_create_request( + dsn_task_code_t rpc_code, + int timeout_milliseconds, + int request_hash, + uint64_t partition_hash + ) { - auto msg = ::dsn::message_ex::create_request(rpc_code, timeout_milliseconds, hash); - return msg; + return ::dsn::message_ex::create_request(rpc_code, timeout_milliseconds, request_hash, partition_hash); } DSN_API dsn_message_t dsn_msg_copy(dsn_message_t msg) @@ -134,7 +138,7 @@ DSN_API void dsn_msg_set_options( if (mask & DSN_MSGM_HASH) { - hdr->client.hash = opts->thread_hash; + hdr->client.hash = opts->request_hash; } if (mask & DSN_MSGM_VNID) @@ -155,7 +159,7 @@ DSN_API void dsn_msg_get_options( { auto hdr = ((::dsn::message_ex*)msg)->header; opts->context = hdr->context; - opts->thread_hash = hdr->client.hash; + opts->request_hash = hdr->client.hash; opts->timeout_ms = hdr->client.timeout_ms; opts->vnid = hdr->vnid; } @@ -389,7 +393,7 @@ message_ex* message_ex::copy_and_prepare_send() return copy; } -message_ex* message_ex::create_request(dsn_task_code_t rpc_code, int timeout_milliseconds, int hash) +message_ex* message_ex::create_request(dsn_task_code_t rpc_code, int timeout_milliseconds, int request_hash, uint64_t partition_hash) { message_ex* msg = new message_ex(); msg->_is_read = false; @@ -398,10 +402,9 @@ message_ex* message_ex::create_request(dsn_task_code_t rpc_code, int timeout_mil // init header auto& hdr = *msg->header; memset(&hdr, 0, sizeof(hdr)); - hdr.hdr_crc32 = hdr.body_crc32 = CRC_INVALID; - - if (0 != hash) - hdr.client.hash = hash; + hdr.hdr_crc32 = hdr.body_crc32 = CRC_INVALID; + + hdr.client.hash = request_hash; if (0 == timeout_milliseconds) { @@ -412,12 +415,20 @@ message_ex* message_ex::create_request(dsn_task_code_t rpc_code, int timeout_mil hdr.client.timeout_ms = timeout_milliseconds; } - strncpy(hdr.rpc_name, dsn_task_code_to_string(rpc_code), sizeof(hdr.rpc_name)); + task_spec* sp = task_spec::get(rpc_code); + strncpy(hdr.rpc_name, sp->name.c_str(), sizeof(hdr.rpc_name)); hdr.rpc_name_fast.local_rpc_id = (uint32_t)rpc_code; hdr.rpc_name_fast.local_hash = s_local_hash; hdr.id = new_id(); + hdr.context.u.is_request = true; + if (0 != partition_hash) + { + hdr.context.u.parameter_type = MSG_PARAM_PARTITION_HASH; + hdr.context.u.parameter = partition_hash; + } + hdr.context.u.is_replication_needed = sp->rpc_request_is_replicated_write_operation; msg->local_rpc_code = (uint32_t)rpc_code; return msg; From 731820883dff081b29facdcfb62e11ab2c2eff9b Mon Sep 17 00:00:00 2001 From: Zhenyu Guo Date: Mon, 22 Feb 2016 00:01:00 +0800 Subject: [PATCH 3/4] fix request hash naming in replication app base --- .../replication/client_lib/replication_app_client_base.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dist/replication/client_lib/replication_app_client_base.cpp b/src/dist/replication/client_lib/replication_app_client_base.cpp index 0fc386b2..4b78e253 100644 --- a/src/dist/replication/client_lib/replication_app_client_base.cpp +++ b/src/dist/replication/client_lib/replication_app_client_base.cpp @@ -322,7 +322,7 @@ void replication_app_client_base::call_with_address(dsn::rpc_address addr, reque dsn_msg_options_t opts; opts.timeout_ms = request->timeout_ms; - opts.thread_hash = gpid_to_hash(request->read_header.gpid); + opts.request_hash = gpid_to_hash(request->read_header.gpid); opts.vnid = *(uint64_t*)(&request->read_header.gpid); dsn_msg_set_options(request->request, &opts, DSN_MSGM_HASH | DSN_MSGM_TIMEOUT); // TODO: not supported yet DSN_MSGM_VNID); } @@ -336,7 +336,7 @@ void replication_app_client_base::call_with_address(dsn::rpc_address addr, reque dsn_msg_options_t opts; opts.timeout_ms = request->timeout_ms; - opts.thread_hash = gpid_to_hash(request->write_header.gpid); + opts.request_hash = gpid_to_hash(request->write_header.gpid); opts.vnid = *(uint64_t*)(&request->write_header.gpid); dsn_msg_set_options(request->request, &opts, DSN_MSGM_HASH | DSN_MSGM_TIMEOUT); // TODO: not supported yet DSN_MSGM_VNID | DSN_MSGM_CONTEXT); From 8e7227ce71d28fdfad7a3f7939e5e9617dedae0f Mon Sep 17 00:00:00 2001 From: Zhenyu Guo Date: Mon, 22 Feb 2016 00:09:45 +0800 Subject: [PATCH 4/4] fix name change of thread_hash to request_hash --- src/core/tests/rpc_message.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/tests/rpc_message.cpp b/src/core/tests/rpc_message.cpp index 579c32f0..c4bab246 100644 --- a/src/core/tests/rpc_message.cpp +++ b/src/core/tests/rpc_message.cpp @@ -213,7 +213,7 @@ TEST(core, message_ex) dsn_msg_get_options(request, &opts); ASSERT_EQ(100, opts.timeout_ms); - ASSERT_EQ(1, opts.thread_hash); + ASSERT_EQ(1, opts.request_hash); ASSERT_EQ(333u, opts.vnid); ASSERT_EQ(444u, opts.context.context);