Skip to content

Commit

Permalink
OpenSearchSink - Enhance logs to include index name and last exceptio…
Browse files Browse the repository at this point in the history
…n information (#4841)

* dplive1.yaml

Signed-off-by: Krishna Kondaka <[email protected]>

* Delete .github/workflows/static.yml

Signed-off-by: Krishna Kondaka <[email protected]>

* OpenSearchSink - Enhance logs to include index name and last exception information

Signed-off-by: Krishna Kondaka <[email protected]>

* Rebased to latest and cleanup messages

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed test errors

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka committed Sep 27, 2024
1 parent 5ffc738 commit 891b228
Showing 1 changed file with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,21 @@ public final class BulkRetryStrategy {
static class BulkOperationRequestResponse {
final AccumulatingBulkRequest bulkRequest;
final BulkResponse response;
public BulkOperationRequestResponse(final AccumulatingBulkRequest bulkRequest, final BulkResponse response) {
final Exception exception;
public BulkOperationRequestResponse(final AccumulatingBulkRequest bulkRequest, final BulkResponse response, final Exception exception) {
this.bulkRequest = bulkRequest;
this.response = response;
this.exception = exception;
}
AccumulatingBulkRequest getBulkRequest() {
return bulkRequest;
}
BulkResponse getResponse() {
return response;
}
String getExceptionMessage() {
return exception != null ? exception.getMessage() : "-";
}
}

public BulkRetryStrategy(final RequestFunction<AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>, BulkResponse> requestFunction,
Expand Down Expand Up @@ -200,10 +205,12 @@ public void execute(final AccumulatingBulkRequest bulkRequest) throws Interrupte
operationResponse = handleRetry(request, response, attempt);
if (operationResponse != null) {
final long delayMillis = backoff.nextDelayMillis(attempt++);
String exceptionMessage = "";
request = operationResponse.getBulkRequest();
response = operationResponse.getResponse();
exceptionMessage = operationResponse.getExceptionMessage();
if (delayMillis < 0) {
RuntimeException e = new RuntimeException(String.format("Number of retries reached the limit of max retries (configured value %d)", maxRetries));
RuntimeException e = new RuntimeException(String.format("Number of retries reached the limit of max retries (configured value %d. Last exception message: %s)", maxRetries, exceptionMessage));
handleFailures(request, null, e);
break;
}
Expand Down Expand Up @@ -251,13 +258,13 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if(isItemInError(bulkItemResponse)) {
final ErrorCause error = bulkItemResponse.error();
LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), error != null ? error.reason() : "");
LOG.warn("index = {} operation = {}, error = {}", bulkItemResponse.index(), bulkItemResponse.operationType(), error != null ? error.reason() : "");
}
}
}
}
bulkRequestNumberOfRetries.increment();
return new BulkOperationRequestResponse(bulkRequestForRetry, bulkResponse);
return new BulkOperationRequestResponse(bulkRequestForRetry, bulkResponse, exceptionFromRequest);
} else {
handleFailures(bulkRequestForRetry, bulkResponse, exceptionFromRequest);
}
Expand All @@ -273,7 +280,7 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
if (error != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(error.type())) {
continue;
}
LOG.warn("operation = {}, status = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.status(), error != null ? error.reason() : "");
LOG.warn("index = {}, operation = {}, status = {}, error = {}", bulkItemResponse.index(), bulkItemResponse.operationType(), bulkItemResponse.status(), error != null ? error.reason() : "");
}
}
handleFailures(bulkRequest, bulkResponse.items());
Expand Down Expand Up @@ -332,7 +339,7 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
requestToReissue.addOperation(bulkOperation);
} else if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
LOG.debug("Index: {}, Received version conflict from OpenSearch: {}", bulkItemResponse.index(), bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
} else {
nonRetryableFailures.add(FailedBulkOperation.builder()
Expand Down Expand Up @@ -368,7 +375,7 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
if (isItemInError(bulkItemResponse)) {
if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
LOG.debug("Index: {}, Received version conflict from OpenSearch: {}", bulkOperation.getIndex(), bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
} else {
failures.add(FailedBulkOperation.builder()
Expand Down

0 comments on commit 891b228

Please sign in to comment.