diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 0e631d0a8e4c32..d68efb0550a484 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -767,6 +767,8 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1"); DEFINE_Bool(share_delta_writers, "true"); // timeout for open load stream rpc in ms DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s +// enable write background when using brpc stream +DEFINE_mBool(enable_brpc_stream_write_background, "true"); // brpc streaming max_buf_size in bytes DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB diff --git a/be/src/common/config.h b/be/src/common/config.h index 303e170bde76ee..e2e8b5323e708c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -820,6 +820,8 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio); DECLARE_Bool(share_delta_writers); // timeout for open load stream rpc in ms DECLARE_Int64(open_load_stream_timeout_ms); +// enable write background when using brpc stream +DECLARE_mBool(enable_brpc_stream_write_background); // brpc streaming max_buf_size in bytes DECLARE_Int64(load_stream_max_buf_size); diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 5751e8308bd3f5..de1ade5aadb47a 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -390,7 +390,9 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) { int64_t delay_ms = dp->param("delay_ms", 1000); bthread_usleep(delay_ms * 1000); }); - ret = brpc::StreamWrite(_stream_id, buf); + brpc::StreamWriteOptions options; + options.write_in_background = config::enable_brpc_stream_write_background; + ret = brpc::StreamWrite(_stream_id, buf, &options); } DBUG_EXECUTE_IF("LoadStreamStub._send_with_retry.stream_write_failed", { ret = EPIPE; }); switch (ret) {