Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36143][ES6][ES7] Intro retry-on-conflict param to resolve Sink ES occurred "version conflict" #109

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
Expand Down Expand Up @@ -129,4 +130,8 @@ public List<HttpHost> getHosts() {
public Optional<Integer> getParallelism() {
return config.getOptional(SINK_PARALLELISM);
}

public int getRetriesOnConflict() {
return config.getOptional(RETRIES_ON_CONFLICT_OPTION).orElse(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,12 @@ public class ElasticsearchConnectorOptions {
.enumType(DeliveryGuarantee.class)
.defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
.withDescription("Optional delivery guarantee when committing.");

public static final ConfigOption<Integer> RETRIES_ON_CONFLICT_OPTION =
ConfigOptions.key("sink.retries-on-conflict")
.intType()
.defaultValue(0)
.withDescription(
"Sets the number of retries of a version conflict occurs "
+ "because the document was updated between getting it and updating it. Defaults to 0.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
format,
XContentType.JSON,
documentType,
createKeyExtractor());
createKeyExtractor(),
config.getRetriesOnConflict());

ElasticsearchSinkBuilderBase<RowData, ? extends ElasticsearchSinkBuilderBase> builder =
builderSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
Expand Down Expand Up @@ -225,7 +226,8 @@ public Set<ConfigOption<?>> optionalOptions() {
DELIVERY_GUARANTEE_OPTION,
PASSWORD_OPTION,
USERNAME_OPTION,
SINK_PARALLELISM)
SINK_PARALLELISM,
RETRIES_ON_CONFLICT_OPTION)
.collect(Collectors.toSet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,21 @@ class RowElasticsearchEmitter implements ElasticsearchEmitter<RowData> {
private final XContentType contentType;
@Nullable private final String documentType;
private final Function<RowData, String> createKey;
private final int retiesOnConflict;

public RowElasticsearchEmitter(
IndexGenerator indexGenerator,
SerializationSchema<RowData> serializationSchema,
XContentType contentType,
@Nullable String documentType,
Function<RowData, String> createKey) {
Function<RowData, String> createKey,
int retriesOnConflict) {
this.indexGenerator = checkNotNull(indexGenerator);
this.serializationSchema = checkNotNull(serializationSchema);
this.contentType = checkNotNull(contentType);
this.documentType = documentType;
this.createKey = checkNotNull(createKey);
this.retiesOnConflict = retriesOnConflict;
}

@Override
Expand Down Expand Up @@ -109,7 +112,8 @@ private void processUpsert(RowData row, RequestIndexer indexer) {
final UpdateRequest updateRequest =
new UpdateRequest(indexGenerator.generate(row), documentType, key)
.doc(document, contentType)
.upsert(document, contentType);
.upsert(document, contentType)
.retryOnConflict(retiesOnConflict);
indexer.add(updateRequest);
} else {
final IndexRequest indexRequest =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;

/** Accessor methods to elasticsearch options. */
Expand Down Expand Up @@ -110,6 +111,10 @@ public Optional<String> getPassword() {
return config.getOptional(PASSWORD_OPTION);
}

public int getRetriesOnConflict() {
return config.getOptional(RETRIES_ON_CONFLICT_OPTION).get();
}

public boolean isBulkFlushBackoffEnabled() {
return config.get(BULK_FLUSH_BACKOFF_TYPE_OPTION)
!= ElasticsearchConnectorOptions.BackOffType.DISABLED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ public class ElasticsearchConnectorOptions {
"The format must produce a valid JSON document. "
+ "Please refer to the documentation on formats for more details.");

public static final ConfigOption<Integer> RETRIES_ON_CONFLICT_OPTION =
ConfigOptions.key("sink.retries-on-conflict")
.intType()
.defaultValue(0)
.withDescription(
"Sets the number of retries of a version conflict occurs "
+ "because the document was updated between getting it and updating it. Defaults to 0.");

// --------------------------------------------------------------------------------------------
// Enums
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,23 @@ class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData>
private final XContentType contentType;
private final RequestFactory requestFactory;
private final Function<RowData, String> createKey;
private final int retryOnConflictNum;

public RowElasticsearchSinkFunction(
IndexGenerator indexGenerator,
@Nullable String docType, // this is deprecated in es 7+
SerializationSchema<RowData> serializationSchema,
XContentType contentType,
RequestFactory requestFactory,
Function<RowData, String> createKey) {
Function<RowData, String> createKey,
int retryOnConflictNum) {
this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
this.docType = docType;
this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
this.contentType = Preconditions.checkNotNull(contentType);
this.requestFactory = Preconditions.checkNotNull(requestFactory);
this.createKey = Preconditions.checkNotNull(createKey);
this.retryOnConflictNum = retryOnConflictNum;
}

@Override
Expand Down Expand Up @@ -95,8 +98,14 @@ private void processUpsert(RowData row, RequestIndexer indexer) {
final String key = createKey.apply(row);
if (key != null) {
final UpdateRequest updateRequest =
requestFactory.createUpdateRequest(
indexGenerator.generate(row), docType, key, contentType, document);
requestFactory
.createUpdateRequest(
indexGenerator.generate(row),
docType,
key,
contentType,
document)
.retryOnConflict(retryOnConflictNum);
indexer.add(updateRequest);
} else {
final IndexRequest indexRequest =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Arrays;
import java.util.Collections;

import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION;
import static org.apache.flink.table.api.DataTypes.ARRAY;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.BYTES;
Expand Down Expand Up @@ -253,4 +254,29 @@ public void testSinkParallelism() {
(SinkV2Provider) esSink.getSinkRuntimeProvider(new ElasticsearchUtil.MockContext());
assertThat(provider.getParallelism()).hasValue(2);
}

@Test
public void testRetriesOnConflict() {
ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory();
DynamicTableSink sink =
sinkFactory.createDynamicTableSink(
createPrefilledTestContext()
.withOption(RETRIES_ON_CONFLICT_OPTION.key(), "2")
.build());
assertThat(sink).isInstanceOf(ElasticsearchDynamicSink.class);
ElasticsearchDynamicSink esSink = (ElasticsearchDynamicSink) sink;

assertThat(esSink.config.getRetriesOnConflict()).isEqualTo(2);
}

@Test
public void testRetriesOnConflictDefault() {
ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory();
DynamicTableSink sink =
sinkFactory.createDynamicTableSink(createPrefilledTestContext().build());
assertThat(sink).isInstanceOf(ElasticsearchDynamicSink.class);
ElasticsearchDynamicSink esSink = (ElasticsearchDynamicSink) sink;

assertThat(esSink.config.getRetriesOnConflict()).isEqualTo(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
format,
XContentType.JSON,
REQUEST_FACTORY,
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
config.getRetriesOnConflict());

final ElasticsearchSink.Builder<RowData> builder =
builderProvider.createBuilder(config.getHosts(), upsertFunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
import static org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES;
Expand Down Expand Up @@ -105,7 +106,8 @@ public class Elasticsearch6DynamicTableFactory
PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
PARTIAL_CACHE_MAX_ROWS,
PARTIAL_CACHE_CACHE_MISSING_KEY,
MAX_RETRIES)
MAX_RETRIES,
RETRIES_ON_CONFLICT_OPTION)
.collect(Collectors.toSet());

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
format,
XContentType.JSON,
REQUEST_FACTORY,
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
config.getRetriesOnConflict());

final ElasticsearchSink.Builder<RowData> builder =
builderProvider.createBuilder(config.getHosts(), upsertFunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
import static org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES;
Expand Down Expand Up @@ -104,7 +105,8 @@ public class Elasticsearch7DynamicTableFactory
PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
PARTIAL_CACHE_MAX_ROWS,
PARTIAL_CACHE_CACHE_MISSING_KEY,
MAX_RETRIES)
MAX_RETRIES,
RETRIES_ON_CONFLICT_OPTION)
.collect(Collectors.toSet());

@Override
Expand Down
Loading