Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Centralize exception handling and fix behavior for RequestTimeoutException #3063

Merged
merged 5 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper;

import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.grpc.GrpcStatusFunction;
import com.linecorp.armeria.server.RequestTimeoutException;
import io.grpc.Metadata;
import io.grpc.Status;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.exceptions.BadRequestException;
import org.opensearch.dataprepper.exceptions.BufferWriteException;
import org.opensearch.dataprepper.exceptions.RequestCancelledException;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;

import java.util.concurrent.TimeoutException;

public class GrpcRequestExceptionHandler implements GrpcStatusFunction {
static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full.";

public static final String REQUEST_TIMEOUTS = "requestTimeouts";
public static final String BAD_REQUESTS = "badRequests";
public static final String REQUESTS_TOO_LARGE = "requestsTooLarge";
public static final String INTERNAL_SERVER_ERROR = "internalServerError";

private final Counter requestTimeoutsCounter;
private final Counter badRequestsCounter;
private final Counter requestsTooLargeCounter;
private final Counter internalServerErrorCounter;

public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics) {
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
}

@Override
public @Nullable Status apply(final RequestContext context, final Throwable exception, final Metadata metadata) {
final Throwable exceptionCause = exception instanceof BufferWriteException ? exception.getCause() : exception;

return handleExceptions(exceptionCause);
}

private Status handleExceptions(final Throwable e) {
if (e instanceof RequestTimeoutException || e instanceof TimeoutException) {
requestTimeoutsCounter.increment();
return createStatus(e, Status.RESOURCE_EXHAUSTED);
} else if (e instanceof SizeOverflowException) {
requestsTooLargeCounter.increment();
return createStatus(e, Status.RESOURCE_EXHAUSTED);
} else if (e instanceof BadRequestException) {
badRequestsCounter.increment();
return createStatus(e, Status.INVALID_ARGUMENT);
} else if (e instanceof RequestCancelledException) {
requestTimeoutsCounter.increment();
return createStatus(e, Status.CANCELLED);
}

internalServerErrorCounter.increment();
return createStatus(e, Status.INTERNAL);
}

private Status createStatus(final Throwable e, final Status status) {
final String message;
if (e instanceof RequestTimeoutException) {
message = ARMERIA_REQUEST_TIMEOUT_MESSAGE;
} else {
message = e.getMessage() == null ? status.getCode().name() : e.getMessage();
}

return status.withDescription(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.loghttp;
package org.opensearch.dataprepper;

import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.server.RequestTimeoutException;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import com.linecorp.armeria.common.HttpResponse;
Expand All @@ -13,10 +17,11 @@
import io.micrometer.core.instrument.Counter;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeoutException;

public class RequestExceptionHandler {
public class HttpRequestExceptionHandler implements ExceptionHandlerFunction {
static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full.";

public static final String REQUEST_TIMEOUTS = "requestTimeouts";
public static final String BAD_REQUESTS = "badRequests";
public static final String REQUESTS_TOO_LARGE = "requestsTooLarge";
Expand All @@ -27,31 +32,39 @@ public class RequestExceptionHandler {
private final Counter requestsTooLargeCounter;
private final Counter internalServerErrorCounter;

public RequestExceptionHandler(final PluginMetrics pluginMetrics) {
public HttpRequestExceptionHandler(final PluginMetrics pluginMetrics) {
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
}

public HttpResponse handleException(final Exception e) {
final String message = e.getMessage() == null? "" : e.getMessage();
return handleException(e, message);
@Override
public HttpResponse handleException(final ServiceRequestContext ctx, final HttpRequest req, final Throwable cause) {
final HttpStatus status = handleException(cause);
final String message;
if (cause instanceof RequestTimeoutException) {
message = ARMERIA_REQUEST_TIMEOUT_MESSAGE;
} else {
message = cause.getMessage() == null ? status.reasonPhrase() : cause.getMessage();
}

return HttpResponse.of(status, MediaType.ANY_TYPE, message);
}

public HttpResponse handleException(final Exception e, final String message) {
Objects.requireNonNull(message);
private HttpStatus handleException(final Throwable e) {
if (e instanceof IOException) {
badRequestsCounter.increment();
return HttpResponse.of(HttpStatus.BAD_REQUEST, MediaType.ANY_TYPE, message);
} else if (e instanceof TimeoutException) {
return HttpStatus.BAD_REQUEST;
} else if (e instanceof TimeoutException || e instanceof RequestTimeoutException) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between TimeoutException and RequestTimeoutException?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimeoutException is thrown by the BlockingBuffer, RequestTimeoutException is thrown by Armeria when the requestTimeout has expired

requestTimeoutsCounter.increment();
return HttpResponse.of(HttpStatus.REQUEST_TIMEOUT, MediaType.ANY_TYPE, message);
return HttpStatus.REQUEST_TIMEOUT;
} else if (e instanceof SizeOverflowException) {
requestsTooLargeCounter.increment();
return HttpResponse.of(HttpStatus.REQUEST_ENTITY_TOO_LARGE, MediaType.ANY_TYPE, message);
return HttpStatus.REQUEST_ENTITY_TOO_LARGE;
}

internalServerErrorCounter.increment();
return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR, MediaType.ANY_TYPE, message);
return HttpStatus.INTERNAL_SERVER_ERROR;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.exceptions;

public class BadRequestException extends RuntimeException {
public BadRequestException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.exceptions;

public class BufferWriteException extends RuntimeException {
public BufferWriteException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.exceptions;

public class RequestCancelledException extends RuntimeException {
public RequestCancelledException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper;

import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.server.RequestTimeoutException;
import io.grpc.Metadata;
import io.grpc.Status;
import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.exceptions.BadRequestException;
import org.opensearch.dataprepper.exceptions.BufferWriteException;
import org.opensearch.dataprepper.exceptions.RequestCancelledException;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.GrpcRequestExceptionHandler.ARMERIA_REQUEST_TIMEOUT_MESSAGE;

@ExtendWith(MockitoExtension.class)
public class GrpcRequestExceptionHandlerTest {
@Mock
private PluginMetrics pluginMetrics;

@Mock
private Counter requestTimeoutsCounter;

@Mock
private Counter badRequestsCounter;

@Mock
private Counter requestsTooLargeCounter;

@Mock
private Counter internalServerErrorCounter;

@Mock
private RequestContext requestContext;

@Mock
private Metadata metadata;

private GrpcRequestExceptionHandler grpcRequestExceptionHandler;

@BeforeEach
public void setUp() {
when(pluginMetrics.counter(HttpRequestExceptionHandler.REQUEST_TIMEOUTS)).thenReturn(requestTimeoutsCounter);
when(pluginMetrics.counter(HttpRequestExceptionHandler.BAD_REQUESTS)).thenReturn(badRequestsCounter);
when(pluginMetrics.counter(HttpRequestExceptionHandler.REQUESTS_TOO_LARGE)).thenReturn(requestsTooLargeCounter);
when(pluginMetrics.counter(HttpRequestExceptionHandler.INTERNAL_SERVER_ERROR)).thenReturn(internalServerErrorCounter);

grpcRequestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
}

@Test
public void testHandleBadRequestException() {
final BadRequestException badRequestExceptionNoMessage = new BadRequestException(null, new IOException());
final String exceptionMessage = UUID.randomUUID().toString();
final BadRequestException badRequestExceptionWithMessage = new BadRequestException(exceptionMessage, new IOException());

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, badRequestExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.INVALID_ARGUMENT));
assertThat(noMessageStatus.getDescription(), equalTo(Status.Code.INVALID_ARGUMENT.name()));

final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, badRequestExceptionWithMessage, metadata);
assertThat(messageStatus.getCode(), equalTo(Status.Code.INVALID_ARGUMENT));
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(badRequestsCounter, times(2)).increment();
}

@Test
public void testHandleTimeoutException() {
final BufferWriteException timeoutExceptionNoMessage = new BufferWriteException(null, new TimeoutException());
final String exceptionMessage = UUID.randomUUID().toString();
final BufferWriteException timeoutExceptionWithMessage = new BufferWriteException(exceptionMessage, new TimeoutException(exceptionMessage));

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, timeoutExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED));
assertThat(noMessageStatus.getDescription(), equalTo(Status.Code.RESOURCE_EXHAUSTED.name()));

final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, timeoutExceptionWithMessage, metadata);
assertThat(messageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED));
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(requestTimeoutsCounter, times(2)).increment();
}

@Test
public void testHandleArmeriaTimeoutException() {
final RequestTimeoutException timeoutExceptionNoMessage = RequestTimeoutException.get();

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, timeoutExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED));
assertThat(noMessageStatus.getDescription(), equalTo(ARMERIA_REQUEST_TIMEOUT_MESSAGE));

verify(requestTimeoutsCounter, times(1)).increment();
}

@Test
public void testHandleSizeOverflowException() {
final BufferWriteException sizeOverflowExceptionNoMessage = new BufferWriteException(null, new SizeOverflowException(null));
final String exceptionMessage = UUID.randomUUID().toString();
final BufferWriteException sizeOverflowExceptionWithMessage = new BufferWriteException(exceptionMessage, new SizeOverflowException(exceptionMessage));

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, sizeOverflowExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED));
assertThat(noMessageStatus.getDescription(), equalTo(Status.Code.RESOURCE_EXHAUSTED.name()));

final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, sizeOverflowExceptionWithMessage, metadata);
assertThat(messageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED));
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(requestsTooLargeCounter, times(2)).increment();
}

@Test
public void testHandleRequestCancelledException() {
final RequestCancelledException requestCancelledExceptionNoMessage = new RequestCancelledException(null);
final String exceptionMessage = UUID.randomUUID().toString();
final RequestCancelledException requestCancelledExceptionWithMessage = new RequestCancelledException(exceptionMessage);

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, requestCancelledExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.CANCELLED));
assertThat(noMessageStatus.getDescription(), equalTo(Status.Code.CANCELLED.name()));

final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, requestCancelledExceptionWithMessage, metadata);
assertThat(messageStatus.getCode(), equalTo(Status.Code.CANCELLED));
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(requestTimeoutsCounter, times(2)).increment();
}

@Test
public void testHandleInternalServerException() {
final RuntimeException runtimeExceptionNoMessage = new RuntimeException();
final String exceptionMessage = UUID.randomUUID().toString();
final RuntimeException runtimeExceptionWithMessage = new RuntimeException(exceptionMessage);

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, runtimeExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.INTERNAL));
assertThat(noMessageStatus.getDescription(), equalTo(Status.Code.INTERNAL.name()));

final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, runtimeExceptionWithMessage, metadata);
assertThat(messageStatus.getCode(), equalTo(Status.Code.INTERNAL));
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(internalServerErrorCounter, times(2)).increment();
}
}
Loading
Loading