Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MB-7960: XDCR on pipelined memcached binary protocol #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 81 additions & 2 deletions include/xdc_replicator.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
%% License for the specific language governing permissions and limitations under
%% the License.

-ifndef(_XDC_COMMON__HRL_).
-define(_XDC_COMMON__HRL_,).

%% couchdb headers
-include("couch_db.hrl").
-include("couch_js_functions.hrl").
Expand All @@ -26,16 +29,31 @@
to_binary/1
]).

%% constants used by XDCR
%% ------------------------------------%%
%% constants and macros used by XDCR %%
%% ------------------------------------%%
-define(REP_ID_VERSION, 2).
%% capture the last 10 entries of checkpoint history per bucket replicator
-define(XDCR_CHECKPOINT_HISTORY, 10).
%% capture the last 10 entries of error history per bucket replicator
-define(XDCR_ERROR_HISTORY, 10).
%% interval (secs) to compute rate stats
-define(XDCR_RATE_STAT_INTERVAL, 1).
%% constants used by XMEM
-define(XDCR_XMEM_CONNECTION_ATTEMPTS, 16).
-define(XDCR_XMEM_CONNECTION_TIMEOUT, 120000). %% timeout in ms
%% builder of error/warning/debug msgs
-define(format_msg(Msg, Args), lists:flatten(io_lib:format(Msg, Args))).

%% by default we reply on remote memcached to do conflict resolution,
%% leave a swtich to if do local conflict resolution in case it is necessary
-define(XDCR_LOCAL_CONFLICT_RESOLUTION, false).


%% data structures

%% -------------------------%%
%% XDCR data structures %%
%% -------------------------%%

%% replication settings used by bucket level and vbucket level replicators
-record(rep, {
Expand Down Expand Up @@ -134,6 +152,7 @@
%% bucket level replication state used by module xdc_replication
-record(replication, {
rep = #rep{}, % the basic replication settings
mode, % replication mode
vbucket_sup, % the supervisor for vb replicators
vbs = [], % list of vb we should be replicating
num_tokens = 0, % number of available tokens used by throttles
Expand All @@ -156,6 +175,12 @@
status = #rep_vb_status{},
%% time the vb replicator intialized
rep_start_time,

%% xmem server process
xmem_srv,
%% remote node
xmem_remote,

throttle,
parent,
source_name,
Expand Down Expand Up @@ -234,6 +259,7 @@
target = #httpdb{}, %% target db
changes_manager, %% process to queue changes from storage
max_conns, %% max connections
xmem_server, %% XMem server process
opt_rep_threshold %% optimistic replication threshold
}).

Expand All @@ -246,3 +272,56 @@
worker_item_checked = 0,
worker_item_replicated = 0
}).

%%-----------------------------------------%%
%% XDCR-MEMCACHED %%
%%-----------------------------------------%%
% statistics
-record(xdc_vb_rep_xmem_statistics, {
item_replicated = 0,
data_replicated = 0,
ckpt_issued = 0,
ckpt_failed = 0
}).

%% information needed talk to remote memcached
-record(xdc_rep_xmem_remote, {
ip, %% inet:ip_address(),
port, %% inet:port_number(),
bucket = "default",
username = "_admin",
password = "_admin",
options = []
}).

%% xmem server state
-record(xdc_vb_rep_xmem_srv_state, {
vb,
parent_vb_rep,
num_workers,
pid_workers,
statistics = #xdc_vb_rep_xmem_statistics{},
remote = #xdc_rep_xmem_remote{},
seed,
enable_pipeline = false,
error_reports
}).

%% xmem worker state
-record(xdc_vb_rep_xmem_worker_state, {
id,
vb,
parent_server_pid,
status,
statistics = #xdc_vb_rep_xmem_statistics{},
socket, %% inet:socket(),
time_connected,
time_init,
options,
error_reports
}).


-endif.

%% end of xdc_replicator.hrl
6 changes: 5 additions & 1 deletion src/mc_client_binary.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@
disable_traffic/1,
wait_for_checkpoint_persistence/3,
get_tap_docs_estimate/3,
get_mass_tap_docs_estimate/2
map_status/1,
process_error_response/1,
get_mass_tap_docs_estimate/2,
ext/2,
rev_to_mcd_ext/1
]).

-type recv_callback() :: fun((_, _, _) -> any()) | undefined.
Expand Down
11 changes: 11 additions & 0 deletions src/ns_config_default.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ default() ->
%% when doc body size is no greater than the threshold
{xdcr_optimistic_replication_threshold, 256},

%% xdcr replication mode:
%% "capi": replicating to ns_server:capi_replication layer
%% "xmem": replciating to mecached directly
{xdcr_replication_mode, "xmem"},
%% # of worker processes per vb rep xmem server
{xdcr_xmem_worker, 1},
%% enable pipelined memcached operations
{xdcr_enable_pipeline_ops, true},
%% inverse of probability to dump non-critical trace
{xdcr_trace_dump_inverse_prob, 1000},

{directory, path_config:component_path(data, "config")},
{index_aware_rebalance_disabled, false},
{max_bucket_count, 10},
Expand Down
6 changes: 5 additions & 1 deletion src/remote_clusters_info.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
%% Construct remote bucket reference that can be used by
%% get_remote_bucket_by_ref functions.
%%
%% - get_memcached_vbucket_info_by_ref/4
%% - get_memcached_vbucket_info_by_ref/{3, 4}
%%
%% -> {ok, {Host :: binary(), MemcachedPort :: integer()}, #remote_bucket{}}
%% | {error, _} | {error, _, _} % see get_remote_bucket_by_ref for errors
Expand Down Expand Up @@ -86,6 +86,7 @@
remote_bucket_reference/2, parse_remote_bucket_reference/1,
invalidate_remote_bucket/2, invalidate_remote_bucket_by_ref/1,
find_cluster_by_uuid/1,
get_memcached_vbucket_info_by_ref/3,
get_memcached_vbucket_info_by_ref/4]).

%% gen_server callbacks
Expand Down Expand Up @@ -251,6 +252,9 @@ invalidate_remote_bucket(ClusterName, Bucket) ->
{invalidate_remote_bucket, Cluster, Bucket}, infinity)
end.

get_memcached_vbucket_info_by_ref(Reference, ForceRefresh, VBucket) ->
get_memcached_vbucket_info_by_ref(Reference, ForceRefresh, VBucket, ?GET_BUCKET_TIMEOUT).

get_memcached_vbucket_info_by_ref(Reference, ForceRefresh, VBucket, Timeout) ->
case get_remote_bucket_by_ref(Reference, ForceRefresh, Timeout) of
{ok, RemoteBucket} ->
Expand Down
18 changes: 17 additions & 1 deletion src/xdc_rep_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,22 @@ dump_parameters() ->
{value, DefaultRestartWaitTime} = ns_config:search(xdcr_failure_restart_interval),
RestartWaitTime = misc:getenv_int("XDCR_FAILURE_RESTART_INTERVAL", DefaultRestartWaitTime),

RepMode = xdc_rep_utils:get_replication_mode(),
OptRepThreshold = xdc_rep_utils:get_opt_replication_threshold(),

{NumXMemWorker, Pipeline}
= case RepMode of
"xmem" ->
DefNumXMemWorker = xdc_rep_utils:get_xmem_worker(),
EnablePipeline = xdc_rep_utils:enable_pipeline_ops(),
{DefNumXMemWorker, EnablePipeline};
_ ->
{undefined, undefined}
end,

?xdcr_debug("default XDCR parameters:~n \t"
"replication mode: ~p (pipleline: ~p, "
"num xmem worker per vb replicator: ~p);~n \t"
"optimistic replication threshold: ~p bytes;~n \t"
"number of max concurrent reps per bucket: ~p;~n \t"
"checkpoint interval in secs: ~p;~n \t"
Expand All @@ -315,7 +328,10 @@ dump_parameters() ->
"max number HTTP connections per vb replicator: ~p;~n \t"
"max number retries per connection: ~p;~n \t"
"vb replicator waiting time before restart: ~p ",
[OptRepThreshold,
[RepMode,
Pipeline,
NumXMemWorker,
OptRepThreshold,
MaxConcurrentReps,
IntervalSecs,
DefBatchSize, DocBatchSizeKB,
Expand Down
60 changes: 58 additions & 2 deletions src/xdc_rep_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
-export([get_master_db/1, get_checkpoint_log_id/2]).
-export([get_opt_replication_threshold/0]).
-export([update_options/1]).
-export([get_replication_mode/0, get_replication_batch_size/0]).
-export([enable_pipeline_ops/0, get_trace_dump_invprob/0]).
-export([get_xmem_worker/0]).

-include("xdc_replicator.hrl").

Expand Down Expand Up @@ -169,10 +172,11 @@ make_options(Props) ->
"optimistic replication threshold: ~p bytes, "
"worker processes: ~p, "
"worker batch size (# of mutations): ~p, "
"socket options: ~p "
"HTTP connections: ~p, "
"connection timeout (ms): ~p,"
"num of retries per request: ~p]",
[OptRepThreshold, DefWorkers, DefBatchSize, DefConns, DefTimeout, DefRetries]),
[OptRepThreshold, DefWorkers, DefBatchSize, DefSocketOptions, DefConns, DefTimeout, DefRetries]),

lists:ukeymerge(1, Options, lists:keysort(1, [
{connection_timeout, DefTimeout},
Expand Down Expand Up @@ -321,7 +325,6 @@ get_opt_replication_threshold() ->
Threshold
end.


%% get xdc replication options, log them if changed
-spec update_options(list()) -> list().
update_options(Options) ->
Expand Down Expand Up @@ -396,5 +399,58 @@ update_options(Options) ->
{worker_processes, DefWorkers},
{opt_rep_threshold, Threshold}]), Options).

-spec get_replication_mode() -> list().
get_replication_mode() ->
{value, DefaultRepMode} = ns_config:search(xdcr_replication_mode),

EnvVar = case (catch string:to_lower(os:getenv("XDCR_REPLICATION_MODE"))) of
"capi" ->
"capi";
"xmem" ->
"xmem";
_ ->
undefined
end,

%% env var overrides ns_config parameter, use default ns_config parameter
%% only when env var is undefined
case EnvVar of
undefined ->
DefaultRepMode;
_ ->
EnvVar
end.

-spec get_replication_batch_size() -> integer().
get_replication_batch_size() ->
%% env parameter can override the ns_config parameter
{value, DefaultDocBatchSize} = ns_config:search(xdcr_worker_batch_size),
DocBatchSize = misc:getenv_int("XDCR_WORKER_BATCH_SIZE", DefaultDocBatchSize),
1024*DocBatchSize.

-spec enable_pipeline_ops() -> boolean().
enable_pipeline_ops() ->
%% env parameter can override the ns_config parameter
{value, EnablePipeline} = ns_config:search(xdcr_enable_pipeline_ops),
case os:getenv("XDCR_ENABLE_PIPELINE") of
"true" ->
true;
"false" ->
false;
_ ->
EnablePipeline
end.

%% inverse probability to dump non-critical datapath trace,
%% trace will be dumped by probability 1/N
-spec get_trace_dump_invprob() -> integer().
get_trace_dump_invprob() ->
%% env parameter can override the ns_config parameter
{value, DefInvProb} = ns_config:search(xdcr_trace_dump_inverse_prob),
misc:getenv_int("XDCR_TRACE_DUMP_INVERSE_PROB", DefInvProb).

-spec get_xmem_worker() -> integer().
get_xmem_worker() ->
%% env parameter can override the ns_config parameter
{value, DefNumXMemWorker} = ns_config:search(xdcr_xmem_worker),
misc:getenv_int("XDCR_XMEM_WORKER", DefNumXMemWorker).
8 changes: 6 additions & 2 deletions src/xdc_replication.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ init([#rep{source = SrcBucketBinary} = Rep]) ->
(_Evt) ->
ok
end,

RepMode = xdc_rep_utils:get_replication_mode(),
{ok, _} = couch_db_update_notifier:start_link(NotifyFun),
?xdcr_debug("couch_db update notifier started", []),
{ok, InitThrottle} = concurrency_throttle:start_link(MaxConcurrentReps, self()),
Expand All @@ -85,6 +85,7 @@ init([#rep{source = SrcBucketBinary} = Rep]) ->
{ok, SrcBucketConfig} ->
Vbs = xdc_rep_utils:my_active_vbuckets(SrcBucketConfig),
RepState0 = #replication{rep = Rep,
mode = RepMode,
vbs = Vbs,
num_tokens = MaxConcurrentReps,
init_throttle = InitThrottle,
Expand All @@ -95,6 +96,7 @@ init([#rep{source = SrcBucketBinary} = Rep]) ->
?xdcr_error("fail to fetch a valid bucket config and no vb replicator "
"would be created (error: ~p)", [Error]),
RepState = #replication{rep = Rep,
mode = RepMode,
num_tokens = MaxConcurrentReps,
init_throttle = InitThrottle,
work_throttle = WorkThrottle,
Expand Down Expand Up @@ -384,6 +386,7 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.

start_vb_replicators(#replication{rep = Rep,
mode = RepMode,
vbucket_sup = Sup,
init_throttle = InitThrottle,
work_throttle = WorkThrottle,
Expand All @@ -408,7 +411,8 @@ start_vb_replicators(#replication{rep = Rep,
Vb,
InitThrottle,
WorkThrottle,
self())
self(),
RepMode)
end, misc:shuffle(NewVbs)),
Replication#replication{vb_rep_dict = Dict2}.

Expand Down
Loading