Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Centralize exception handling and fix behavior for RequestTimeoutException #3063

Merged
merged 5 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper;

import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.grpc.GrpcStatusFunction;
import com.linecorp.armeria.server.RequestTimeoutException;
import io.grpc.Metadata;
import io.grpc.Status;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.exceptions.BadRequestException;
import org.opensearch.dataprepper.exceptions.BufferWriteException;
import org.opensearch.dataprepper.exceptions.RequestCancelledException;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;

import java.util.concurrent.TimeoutException;

public class GrpcRequestExceptionHandler implements GrpcStatusFunction {
static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full.";
static final String DEFAULT_MESSAGE = "";

public static final String REQUEST_TIMEOUTS = "requestTimeouts";
public static final String BAD_REQUESTS = "badRequests";
public static final String REQUESTS_TOO_LARGE = "requestsTooLarge";
public static final String INTERNAL_SERVER_ERROR = "internalServerError";

private final Counter requestTimeoutsCounter;
private final Counter badRequestsCounter;
private final Counter requestsTooLargeCounter;
private final Counter internalServerErrorCounter;

public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics) {
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
}

@Override
public @Nullable Status apply(final RequestContext context, final Throwable exception, final Metadata metadata) {
final Throwable exceptionCause = exception instanceof BufferWriteException ? exception.getCause() : exception;

return handleExceptions(exceptionCause);
}

private Status handleExceptions(final Throwable e) {
if (e instanceof RequestTimeoutException || e instanceof TimeoutException) {
requestTimeoutsCounter.increment();
return createStatus(e, Status.RESOURCE_EXHAUSTED);
} else if (e instanceof SizeOverflowException) {
requestsTooLargeCounter.increment();
return createStatus(e, Status.RESOURCE_EXHAUSTED);
} else if (e instanceof BadRequestException) {
badRequestsCounter.increment();
return createStatus(e, Status.INVALID_ARGUMENT);
} else if (e instanceof RequestCancelledException) {
requestTimeoutsCounter.increment();
return createStatus(e, Status.CANCELLED);
}

internalServerErrorCounter.increment();
return createStatus(e, Status.INTERNAL);
}

private Status createStatus(final Throwable e, final Status status) {
final String message;
if (e instanceof RequestTimeoutException) {
message = ARMERIA_REQUEST_TIMEOUT_MESSAGE;
} else {
message = e.getMessage() == null ? DEFAULT_MESSAGE : e.getMessage();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This DEFAULT_MESSAGE could possibly be improved since the handleExceptions method already determines what type it is. So we can probably remove DEFAULT_MESSAGE and derive it from the Status.

Assuming that Status::toString returns a useful string, this could be:

message = e.getMessage() == null ? status.toString() : e.getMessage();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one's a little trickier. The toString method is

return MoreObjects.toStringHelper(this)
        .add("code", code.name())
        .add("description", description)
        .add("cause", cause != null ? getStackTraceAsString(cause) : cause)
        .toString();

The cause and description are null by default. We could add the whole exception as the cause but I don't think we want to return the stacktrace in the HTTP response.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add something generic like Unexpected exception encountered: <exception class>. Let me know what your thoughts are

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just code.name() instead of Status.toString()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is, status.getCode.name()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, will do

}

return status.withDescription(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.loghttp;
package org.opensearch.dataprepper;

import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.server.RequestTimeoutException;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import com.linecorp.armeria.common.HttpResponse;
Expand All @@ -16,7 +20,10 @@
import java.util.Objects;
import java.util.concurrent.TimeoutException;

public class RequestExceptionHandler {
public class HttpRequestExceptionHandler implements ExceptionHandlerFunction {
static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full.";
static final String DEFAULT_MESSAGE = "";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any particular reason this is empty? Should we have something generic like "Internal Server Error"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. I just pulled this out into a constant from the original implementation. This empty string could be populated for non-5XX exceptions as well so I'm not sure if there's a good generic message. Open to suggestions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand you are saying this was the default behavior.


public static final String REQUEST_TIMEOUTS = "requestTimeouts";
public static final String BAD_REQUESTS = "badRequests";
public static final String REQUESTS_TOO_LARGE = "requestsTooLarge";
Expand All @@ -27,24 +34,31 @@ public class RequestExceptionHandler {
private final Counter requestsTooLargeCounter;
private final Counter internalServerErrorCounter;

public RequestExceptionHandler(final PluginMetrics pluginMetrics) {
public HttpRequestExceptionHandler(final PluginMetrics pluginMetrics) {
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
}

public HttpResponse handleException(final Exception e) {
final String message = e.getMessage() == null? "" : e.getMessage();
return handleException(e, message);
@Override
public HttpResponse handleException(final ServiceRequestContext ctx, final HttpRequest req, final Throwable cause) {
final String message;
if (cause instanceof RequestTimeoutException) {
message = ARMERIA_REQUEST_TIMEOUT_MESSAGE;
} else {
message = cause.getMessage() == null ? DEFAULT_MESSAGE : cause.getMessage();
}

return handleException(cause, message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default message issue can be resolved here.

This private handleException method could be refactored to return HttpStatus. All the return values appear to generate the same HttpResponse aside from the status.

Then this line here becomes:

HttpStatus status = handleException(cause, /*message no longer needed*/);
return HttpResponse.of(status, MediaType.ANY_TYPE, cause.getMessage() == null ? status.toString() : cause.getMessage());

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Will refactor

}

public HttpResponse handleException(final Exception e, final String message) {
private HttpResponse handleException(final Throwable e, final String message) {
Objects.requireNonNull(message);
if (e instanceof IOException) {
badRequestsCounter.increment();
return HttpResponse.of(HttpStatus.BAD_REQUEST, MediaType.ANY_TYPE, message);
} else if (e instanceof TimeoutException) {
} else if (e instanceof TimeoutException || e instanceof RequestTimeoutException) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between TimeoutException and RequestTimeoutException?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimeoutException is thrown by the BlockingBuffer, RequestTimeoutException is thrown by Armeria when the requestTimeout has expired

requestTimeoutsCounter.increment();
return HttpResponse.of(HttpStatus.REQUEST_TIMEOUT, MediaType.ANY_TYPE, message);
} else if (e instanceof SizeOverflowException) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.exceptions;

public class BadRequestException extends RuntimeException {
public BadRequestException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.exceptions;

public class BufferWriteException extends RuntimeException {
public BufferWriteException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.exceptions;

public class RequestCancelledException extends RuntimeException {
public RequestCancelledException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper;

import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.server.RequestTimeoutException;
import io.grpc.Metadata;
import io.grpc.Status;
import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.exceptions.BadRequestException;
import org.opensearch.dataprepper.exceptions.BufferWriteException;
import org.opensearch.dataprepper.exceptions.RequestCancelledException;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.GrpcRequestExceptionHandler.ARMERIA_REQUEST_TIMEOUT_MESSAGE;
import static org.opensearch.dataprepper.GrpcRequestExceptionHandler.DEFAULT_MESSAGE;

@ExtendWith(MockitoExtension.class)
public class GrpcRequestExceptionHandlerTest {
@Mock
private PluginMetrics pluginMetrics;

@Mock
private Counter requestTimeoutsCounter;

@Mock
private Counter badRequestsCounter;

@Mock
private Counter requestsTooLargeCounter;

@Mock
private Counter internalServerErrorCounter;

@Mock
private RequestContext requestContext;

@Mock
private Metadata metadata;

private GrpcRequestExceptionHandler grpcRequestExceptionHandler;

@BeforeEach
public void setUp() {
when(pluginMetrics.counter(HttpRequestExceptionHandler.REQUEST_TIMEOUTS)).thenReturn(requestTimeoutsCounter);
when(pluginMetrics.counter(HttpRequestExceptionHandler.BAD_REQUESTS)).thenReturn(badRequestsCounter);
when(pluginMetrics.counter(HttpRequestExceptionHandler.REQUESTS_TOO_LARGE)).thenReturn(requestsTooLargeCounter);
when(pluginMetrics.counter(HttpRequestExceptionHandler.INTERNAL_SERVER_ERROR)).thenReturn(internalServerErrorCounter);

grpcRequestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
}

@Test
public void testHandleBadRequestException() {
final BadRequestException badRequestExceptionNoMessage = new BadRequestException(null, new IOException());
final String exceptionMessage = UUID.randomUUID().toString();
final BadRequestException badRequestExceptionWithMessage = new BadRequestException(exceptionMessage, new IOException());

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, badRequestExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.INVALID_ARGUMENT));
assertThat(noMessageStatus.getDescription(), equalTo(DEFAULT_MESSAGE));

final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, badRequestExceptionWithMessage, metadata);
assertThat(messageStatus.getCode(), equalTo(Status.Code.INVALID_ARGUMENT));
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(badRequestsCounter, times(2)).increment();
}

@Test
public void testHandleTimeoutException() {
final BufferWriteException timeoutExceptionNoMessage = new BufferWriteException(null, new TimeoutException());
final String exceptionMessage = UUID.randomUUID().toString();
final BufferWriteException timeoutExceptionWithMessage = new BufferWriteException(exceptionMessage, new TimeoutException(exceptionMessage));

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, timeoutExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED));
assertThat(noMessageStatus.getDescription(), equalTo(DEFAULT_MESSAGE));

final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, timeoutExceptionWithMessage, metadata);
assertThat(messageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED));
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(requestTimeoutsCounter, times(2)).increment();
}

@Test
public void testHandleArmeriaTimeoutException() {
final RequestTimeoutException timeoutExceptionNoMessage = RequestTimeoutException.get();

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, timeoutExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED));
assertThat(noMessageStatus.getDescription(), equalTo(ARMERIA_REQUEST_TIMEOUT_MESSAGE));

verify(requestTimeoutsCounter, times(1)).increment();
}

@Test
public void testHandleSizeOverflowException() {
final BufferWriteException sizeOverflowExceptionNoMessage = new BufferWriteException(null, new SizeOverflowException(null));
final String exceptionMessage = UUID.randomUUID().toString();
final BufferWriteException sizeOverflowExceptionWithMessage = new BufferWriteException(exceptionMessage, new SizeOverflowException(exceptionMessage));

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, sizeOverflowExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED));
assertThat(noMessageStatus.getDescription(), equalTo(DEFAULT_MESSAGE));

final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, sizeOverflowExceptionWithMessage, metadata);
assertThat(messageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED));
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(requestsTooLargeCounter, times(2)).increment();
}

@Test
public void testHandleRequestCancelledException() {
final RequestCancelledException requestCancelledExceptionNoMessage = new RequestCancelledException(null);
final String exceptionMessage = UUID.randomUUID().toString();
final RequestCancelledException requestCancelledExceptionWithMessage = new RequestCancelledException(exceptionMessage);

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, requestCancelledExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.CANCELLED));
assertThat(noMessageStatus.getDescription(), equalTo(DEFAULT_MESSAGE));

final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, requestCancelledExceptionWithMessage, metadata);
assertThat(messageStatus.getCode(), equalTo(Status.Code.CANCELLED));
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(requestTimeoutsCounter, times(2)).increment();
}

@Test
public void testHandleInternalServerException() {
final RuntimeException runtimeExceptionNoMessage = new RuntimeException();
final String exceptionMessage = UUID.randomUUID().toString();
final RuntimeException runtimeExceptionWithMessage = new RuntimeException(exceptionMessage);

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, runtimeExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.INTERNAL));
assertThat(noMessageStatus.getDescription(), equalTo(DEFAULT_MESSAGE));

final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, runtimeExceptionWithMessage, metadata);
assertThat(messageStatus.getCode(), equalTo(Status.Code.INTERNAL));
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(internalServerErrorCounter, times(2)).increment();
}
}
Loading
Loading