-
Notifications
You must be signed in to change notification settings - Fork 190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Centralize exception handling and fix behavior for RequestTimeoutException #3063
Changes from 4 commits
85f9e90
908b7c8
1a2b993
b389de2
c70ac58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper; | ||
|
||
import com.linecorp.armeria.common.RequestContext; | ||
import com.linecorp.armeria.common.annotation.Nullable; | ||
import com.linecorp.armeria.common.grpc.GrpcStatusFunction; | ||
import com.linecorp.armeria.server.RequestTimeoutException; | ||
import io.grpc.Metadata; | ||
import io.grpc.Status; | ||
import io.micrometer.core.instrument.Counter; | ||
import org.opensearch.dataprepper.exceptions.BadRequestException; | ||
import org.opensearch.dataprepper.exceptions.BufferWriteException; | ||
import org.opensearch.dataprepper.exceptions.RequestCancelledException; | ||
import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
import org.opensearch.dataprepper.model.buffer.SizeOverflowException; | ||
|
||
import java.util.concurrent.TimeoutException; | ||
|
||
public class GrpcRequestExceptionHandler implements GrpcStatusFunction { | ||
static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full."; | ||
static final String DEFAULT_MESSAGE = ""; | ||
|
||
public static final String REQUEST_TIMEOUTS = "requestTimeouts"; | ||
public static final String BAD_REQUESTS = "badRequests"; | ||
public static final String REQUESTS_TOO_LARGE = "requestsTooLarge"; | ||
public static final String INTERNAL_SERVER_ERROR = "internalServerError"; | ||
|
||
private final Counter requestTimeoutsCounter; | ||
private final Counter badRequestsCounter; | ||
private final Counter requestsTooLargeCounter; | ||
private final Counter internalServerErrorCounter; | ||
|
||
public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics) { | ||
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS); | ||
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS); | ||
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE); | ||
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR); | ||
} | ||
|
||
@Override | ||
public @Nullable Status apply(final RequestContext context, final Throwable exception, final Metadata metadata) { | ||
final Throwable exceptionCause = exception instanceof BufferWriteException ? exception.getCause() : exception; | ||
|
||
return handleExceptions(exceptionCause); | ||
} | ||
|
||
private Status handleExceptions(final Throwable e) { | ||
if (e instanceof RequestTimeoutException || e instanceof TimeoutException) { | ||
requestTimeoutsCounter.increment(); | ||
return createStatus(e, Status.RESOURCE_EXHAUSTED); | ||
} else if (e instanceof SizeOverflowException) { | ||
requestsTooLargeCounter.increment(); | ||
return createStatus(e, Status.RESOURCE_EXHAUSTED); | ||
} else if (e instanceof BadRequestException) { | ||
badRequestsCounter.increment(); | ||
return createStatus(e, Status.INVALID_ARGUMENT); | ||
} else if (e instanceof RequestCancelledException) { | ||
requestTimeoutsCounter.increment(); | ||
return createStatus(e, Status.CANCELLED); | ||
} | ||
|
||
internalServerErrorCounter.increment(); | ||
return createStatus(e, Status.INTERNAL); | ||
} | ||
|
||
private Status createStatus(final Throwable e, final Status status) { | ||
final String message; | ||
if (e instanceof RequestTimeoutException) { | ||
message = ARMERIA_REQUEST_TIMEOUT_MESSAGE; | ||
} else { | ||
message = e.getMessage() == null ? DEFAULT_MESSAGE : e.getMessage(); | ||
} | ||
|
||
return status.withDescription(message); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,8 +3,12 @@ | |
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.source.loghttp; | ||
package org.opensearch.dataprepper; | ||
|
||
import com.linecorp.armeria.common.HttpRequest; | ||
import com.linecorp.armeria.server.RequestTimeoutException; | ||
import com.linecorp.armeria.server.ServiceRequestContext; | ||
import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction; | ||
import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
import org.opensearch.dataprepper.model.buffer.SizeOverflowException; | ||
import com.linecorp.armeria.common.HttpResponse; | ||
|
@@ -16,7 +20,10 @@ | |
import java.util.Objects; | ||
import java.util.concurrent.TimeoutException; | ||
|
||
public class RequestExceptionHandler { | ||
public class HttpRequestExceptionHandler implements ExceptionHandlerFunction { | ||
static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full."; | ||
static final String DEFAULT_MESSAGE = ""; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any particular reason this is empty? Should we have something generic like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure. I just pulled this out into a constant from the original implementation. This empty string could be populated for non-5XX exceptions as well so I'm not sure if there's a good generic message. Open to suggestions There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand you are saying this was the default behavior. |
||
|
||
public static final String REQUEST_TIMEOUTS = "requestTimeouts"; | ||
public static final String BAD_REQUESTS = "badRequests"; | ||
public static final String REQUESTS_TOO_LARGE = "requestsTooLarge"; | ||
|
@@ -27,24 +34,31 @@ public class RequestExceptionHandler { | |
private final Counter requestsTooLargeCounter; | ||
private final Counter internalServerErrorCounter; | ||
|
||
public RequestExceptionHandler(final PluginMetrics pluginMetrics) { | ||
public HttpRequestExceptionHandler(final PluginMetrics pluginMetrics) { | ||
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS); | ||
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS); | ||
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE); | ||
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR); | ||
} | ||
|
||
public HttpResponse handleException(final Exception e) { | ||
final String message = e.getMessage() == null? "" : e.getMessage(); | ||
return handleException(e, message); | ||
@Override | ||
public HttpResponse handleException(final ServiceRequestContext ctx, final HttpRequest req, final Throwable cause) { | ||
final String message; | ||
if (cause instanceof RequestTimeoutException) { | ||
message = ARMERIA_REQUEST_TIMEOUT_MESSAGE; | ||
} else { | ||
message = cause.getMessage() == null ? DEFAULT_MESSAGE : cause.getMessage(); | ||
} | ||
|
||
return handleException(cause, message); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the default message issue can be resolved here. This private Then this line here becomes:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. Will refactor |
||
} | ||
|
||
public HttpResponse handleException(final Exception e, final String message) { | ||
private HttpResponse handleException(final Throwable e, final String message) { | ||
Objects.requireNonNull(message); | ||
if (e instanceof IOException) { | ||
badRequestsCounter.increment(); | ||
return HttpResponse.of(HttpStatus.BAD_REQUEST, MediaType.ANY_TYPE, message); | ||
} else if (e instanceof TimeoutException) { | ||
} else if (e instanceof TimeoutException || e instanceof RequestTimeoutException) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the difference between There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TimeoutException is thrown by the BlockingBuffer, RequestTimeoutException is thrown by Armeria when the requestTimeout has expired |
||
requestTimeoutsCounter.increment(); | ||
return HttpResponse.of(HttpStatus.REQUEST_TIMEOUT, MediaType.ANY_TYPE, message); | ||
} else if (e instanceof SizeOverflowException) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.exceptions; | ||
|
||
public class BadRequestException extends RuntimeException { | ||
public BadRequestException(final String message, final Throwable cause) { | ||
super(message, cause); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.exceptions; | ||
|
||
public class BufferWriteException extends RuntimeException { | ||
public BufferWriteException(final String message, final Throwable cause) { | ||
super(message, cause); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.exceptions; | ||
|
||
public class RequestCancelledException extends RuntimeException { | ||
public RequestCancelledException(final String message) { | ||
super(message); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper; | ||
|
||
import com.linecorp.armeria.common.RequestContext; | ||
import com.linecorp.armeria.server.RequestTimeoutException; | ||
import io.grpc.Metadata; | ||
import io.grpc.Status; | ||
import io.micrometer.core.instrument.Counter; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.ExtendWith; | ||
import org.mockito.Mock; | ||
import org.mockito.junit.jupiter.MockitoExtension; | ||
import org.opensearch.dataprepper.exceptions.BadRequestException; | ||
import org.opensearch.dataprepper.exceptions.BufferWriteException; | ||
import org.opensearch.dataprepper.exceptions.RequestCancelledException; | ||
import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
import org.opensearch.dataprepper.model.buffer.SizeOverflowException; | ||
|
||
import java.io.IOException; | ||
import java.util.UUID; | ||
import java.util.concurrent.TimeoutException; | ||
|
||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.when; | ||
import static org.opensearch.dataprepper.GrpcRequestExceptionHandler.ARMERIA_REQUEST_TIMEOUT_MESSAGE; | ||
import static org.opensearch.dataprepper.GrpcRequestExceptionHandler.DEFAULT_MESSAGE; | ||
|
||
@ExtendWith(MockitoExtension.class) | ||
public class GrpcRequestExceptionHandlerTest { | ||
@Mock | ||
private PluginMetrics pluginMetrics; | ||
|
||
@Mock | ||
private Counter requestTimeoutsCounter; | ||
|
||
@Mock | ||
private Counter badRequestsCounter; | ||
|
||
@Mock | ||
private Counter requestsTooLargeCounter; | ||
|
||
@Mock | ||
private Counter internalServerErrorCounter; | ||
|
||
@Mock | ||
private RequestContext requestContext; | ||
|
||
@Mock | ||
private Metadata metadata; | ||
|
||
private GrpcRequestExceptionHandler grpcRequestExceptionHandler; | ||
|
||
@BeforeEach | ||
public void setUp() { | ||
when(pluginMetrics.counter(HttpRequestExceptionHandler.REQUEST_TIMEOUTS)).thenReturn(requestTimeoutsCounter); | ||
when(pluginMetrics.counter(HttpRequestExceptionHandler.BAD_REQUESTS)).thenReturn(badRequestsCounter); | ||
when(pluginMetrics.counter(HttpRequestExceptionHandler.REQUESTS_TOO_LARGE)).thenReturn(requestsTooLargeCounter); | ||
when(pluginMetrics.counter(HttpRequestExceptionHandler.INTERNAL_SERVER_ERROR)).thenReturn(internalServerErrorCounter); | ||
|
||
grpcRequestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics); | ||
} | ||
|
||
@Test | ||
public void testHandleBadRequestException() { | ||
final BadRequestException badRequestExceptionNoMessage = new BadRequestException(null, new IOException()); | ||
final String exceptionMessage = UUID.randomUUID().toString(); | ||
final BadRequestException badRequestExceptionWithMessage = new BadRequestException(exceptionMessage, new IOException()); | ||
|
||
final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, badRequestExceptionNoMessage, metadata); | ||
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.INVALID_ARGUMENT)); | ||
assertThat(noMessageStatus.getDescription(), equalTo(DEFAULT_MESSAGE)); | ||
|
||
final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, badRequestExceptionWithMessage, metadata); | ||
assertThat(messageStatus.getCode(), equalTo(Status.Code.INVALID_ARGUMENT)); | ||
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage)); | ||
|
||
verify(badRequestsCounter, times(2)).increment(); | ||
} | ||
|
||
@Test | ||
public void testHandleTimeoutException() { | ||
final BufferWriteException timeoutExceptionNoMessage = new BufferWriteException(null, new TimeoutException()); | ||
final String exceptionMessage = UUID.randomUUID().toString(); | ||
final BufferWriteException timeoutExceptionWithMessage = new BufferWriteException(exceptionMessage, new TimeoutException(exceptionMessage)); | ||
|
||
final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, timeoutExceptionNoMessage, metadata); | ||
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED)); | ||
assertThat(noMessageStatus.getDescription(), equalTo(DEFAULT_MESSAGE)); | ||
|
||
final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, timeoutExceptionWithMessage, metadata); | ||
assertThat(messageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED)); | ||
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage)); | ||
|
||
verify(requestTimeoutsCounter, times(2)).increment(); | ||
} | ||
|
||
@Test | ||
public void testHandleArmeriaTimeoutException() { | ||
final RequestTimeoutException timeoutExceptionNoMessage = RequestTimeoutException.get(); | ||
|
||
final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, timeoutExceptionNoMessage, metadata); | ||
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED)); | ||
assertThat(noMessageStatus.getDescription(), equalTo(ARMERIA_REQUEST_TIMEOUT_MESSAGE)); | ||
|
||
verify(requestTimeoutsCounter, times(1)).increment(); | ||
} | ||
|
||
@Test | ||
public void testHandleSizeOverflowException() { | ||
final BufferWriteException sizeOverflowExceptionNoMessage = new BufferWriteException(null, new SizeOverflowException(null)); | ||
final String exceptionMessage = UUID.randomUUID().toString(); | ||
final BufferWriteException sizeOverflowExceptionWithMessage = new BufferWriteException(exceptionMessage, new SizeOverflowException(exceptionMessage)); | ||
|
||
final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, sizeOverflowExceptionNoMessage, metadata); | ||
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED)); | ||
assertThat(noMessageStatus.getDescription(), equalTo(DEFAULT_MESSAGE)); | ||
|
||
final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, sizeOverflowExceptionWithMessage, metadata); | ||
assertThat(messageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED)); | ||
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage)); | ||
|
||
verify(requestsTooLargeCounter, times(2)).increment(); | ||
} | ||
|
||
@Test | ||
public void testHandleRequestCancelledException() { | ||
final RequestCancelledException requestCancelledExceptionNoMessage = new RequestCancelledException(null); | ||
final String exceptionMessage = UUID.randomUUID().toString(); | ||
final RequestCancelledException requestCancelledExceptionWithMessage = new RequestCancelledException(exceptionMessage); | ||
|
||
final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, requestCancelledExceptionNoMessage, metadata); | ||
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.CANCELLED)); | ||
assertThat(noMessageStatus.getDescription(), equalTo(DEFAULT_MESSAGE)); | ||
|
||
final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, requestCancelledExceptionWithMessage, metadata); | ||
assertThat(messageStatus.getCode(), equalTo(Status.Code.CANCELLED)); | ||
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage)); | ||
|
||
verify(requestTimeoutsCounter, times(2)).increment(); | ||
} | ||
|
||
@Test | ||
public void testHandleInternalServerException() { | ||
final RuntimeException runtimeExceptionNoMessage = new RuntimeException(); | ||
final String exceptionMessage = UUID.randomUUID().toString(); | ||
final RuntimeException runtimeExceptionWithMessage = new RuntimeException(exceptionMessage); | ||
|
||
final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, runtimeExceptionNoMessage, metadata); | ||
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.INTERNAL)); | ||
assertThat(noMessageStatus.getDescription(), equalTo(DEFAULT_MESSAGE)); | ||
|
||
final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, runtimeExceptionWithMessage, metadata); | ||
assertThat(messageStatus.getCode(), equalTo(Status.Code.INTERNAL)); | ||
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage)); | ||
|
||
verify(internalServerErrorCounter, times(2)).increment(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This
DEFAULT_MESSAGE
could possibly be improved since thehandleExceptions
method already determines what type it is. So we can probably removeDEFAULT_MESSAGE
and derive it from theStatus
.Assuming that
Status::toString
returns a useful string, this could be:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one's a little trickier. The
toString
method isThe cause and description are null by default. We could add the whole exception as the cause but I don't think we want to return the stacktrace in the HTTP response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add something generic like
Unexpected exception encountered: <exception class>
. Let me know what your thoughts areThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just
code.name()
instead ofStatus.toString()
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is,
status.getCode.name()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, will do