Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caching implementation of EventKeyFactory #4843

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.Nulls;
import org.opensearch.dataprepper.core.event.EventConfiguration;
import org.opensearch.dataprepper.core.event.EventConfigurationContainer;
import org.opensearch.dataprepper.model.configuration.PipelineExtensions;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.parser.config.MetricTagFilter;
Expand All @@ -29,7 +31,7 @@
/**
* Class to hold configuration for DataPrepper, including server port and Log4j settings
*/
public class DataPrepperConfiguration implements ExtensionsConfiguration {
public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer {
static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L);

private static final String DEFAULT_SOURCE_COORDINATION_STORE = "in_memory";
Expand All @@ -47,6 +49,7 @@ public class DataPrepperConfiguration implements ExtensionsConfiguration {
private CircuitBreakerConfig circuitBreakerConfig;
private SourceCoordinationConfig sourceCoordinationConfig;
private PipelineShutdownOption pipelineShutdown;
private EventConfiguration eventConfiguration;
private Map<String, String> metricTags = new HashMap<>();
private List<MetricTagFilter> metricTagFilters = new LinkedList<>();
private PeerForwarderConfiguration peerForwarderConfiguration;
Expand Down Expand Up @@ -92,6 +95,7 @@ public DataPrepperConfiguration(
@JsonProperty("circuit_breakers") final CircuitBreakerConfig circuitBreakerConfig,
@JsonProperty("source_coordination") final SourceCoordinationConfig sourceCoordinationConfig,
@JsonProperty("pipeline_shutdown") final PipelineShutdownOption pipelineShutdown,
@JsonProperty("event") final EventConfiguration eventConfiguration,
@JsonProperty("extensions")
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonSetter(nulls = Nulls.SKIP)
Expand All @@ -102,6 +106,7 @@ public DataPrepperConfiguration(
? new SourceCoordinationConfig(new PluginModel(DEFAULT_SOURCE_COORDINATION_STORE, Collections.emptyMap()), null)
: sourceCoordinationConfig;
this.pipelineShutdown = pipelineShutdown != null ? pipelineShutdown : DEFAULT_PIPELINE_SHUTDOWN;
this.eventConfiguration = eventConfiguration != null ? eventConfiguration : EventConfiguration.defaultConfiguration();
setSsl(ssl);
this.keyStoreFilePath = keyStoreFilePath != null ? keyStoreFilePath : "";
this.keyStorePassword = keyStorePassword != null ? keyStorePassword : "";
Expand Down Expand Up @@ -226,6 +231,10 @@ public PipelineShutdownOption getPipelineShutdown() {
return pipelineShutdown;
}

public EventConfiguration getEventConfiguration() {
return eventConfiguration;
}

@Override
public PipelineExtensions getPipelineExtensions() {
return pipelineExtensions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ void setUp() {

@AfterEach
void tearDown() {
verify(dataPrepperConfiguration).getEventConfiguration();
verifyNoMoreInteractions(dataPrepperConfiguration);
}

Expand Down
2 changes: 2 additions & 0 deletions data-prepper-event/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@ dependencies {
implementation(libs.spring.context) {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation libs.caffeine
testImplementation libs.commons.lang3
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.event;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Objects;

class CachingEventKeyFactory implements EventKeyFactory {
private static final Logger log = LoggerFactory.getLogger(CachingEventKeyFactory.class);
private final EventKeyFactory delegateEventKeyFactory;
private final Cache<CacheKey, EventKey> cache;

private static class CacheKey {
private final String key;
private final EventAction[] eventActions;

private CacheKey(final String key, final EventAction[] eventActions) {
this.key = key;
this.eventActions = eventActions;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final CacheKey cacheKey = (CacheKey) o;
return Objects.equals(key, cacheKey.key) && Arrays.equals(eventActions, cacheKey.eventActions);
}

@Override
public int hashCode() {
int result = Objects.hash(key);
result = 31 * result + Arrays.hashCode(eventActions);
return result;
}
}

CachingEventKeyFactory(final EventKeyFactory delegateEventKeyFactory, final EventConfiguration eventConfiguration) {
Objects.requireNonNull(delegateEventKeyFactory);
Objects.requireNonNull(eventConfiguration);

log.debug("Configured to cache a maximum of {} event keys.", eventConfiguration.getMaximumCachedKeys());

this.delegateEventKeyFactory = delegateEventKeyFactory;
cache = Caffeine.newBuilder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I was not familiar with this library before

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caffeine is preferred over the Guava cache. And we actually have a Checkstyle to prevent use of the Guava cache.

.maximumSize(eventConfiguration.getMaximumCachedKeys())
.build();
}

@Override
public EventKey createEventKey(final String key, final EventAction... forActions) {
return getOrCreateEventKey(new CacheKey(key, forActions));
}

private EventKey getOrCreateEventKey(final CacheKey cacheKey) {
return cache.asMap().computeIfAbsent(cacheKey, key -> delegateEventKeyFactory.createEventKey(key.key, key.eventActions));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.event.InternalOnlyEventKeyBridge;

import javax.inject.Named;

@Named
public class DefaultEventKeyFactory implements EventKeyFactory {
class DefaultEventKeyFactory implements EventKeyFactory {
@Override
public EventKey createEventKey(final String key, final EventAction... forActions) {
return InternalOnlyEventKeyBridge.createEventKey(key, forActions);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.event;

import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Data Prepper configurations for events.
*/
public class EventConfiguration {
@JsonProperty("maximum_cached_keys")
private Integer maximumCachedKeys = 512;

public static EventConfiguration defaultConfiguration() {
return new EventConfiguration();
}

/**
* Gets the maximum number of cached {@link org.opensearch.dataprepper.model.event.EventKey} objects.
*
* @return the cache maximum count
*/
Integer getMaximumCachedKeys() {
return maximumCachedKeys;
}

void setMaximumCachedKeys(final Integer maximumCachedKeys) {
this.maximumCachedKeys = maximumCachedKeys;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.event;

public interface EventConfigurationContainer {
EventConfiguration getEventConfiguration();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.event;

import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.inject.Named;

@Configuration
class EventFactoryApplicationConfiguration {
@Bean
EventConfiguration eventConfiguration(@Autowired(required = false) final EventConfigurationContainer eventConfigurationContainer) {
if(eventConfigurationContainer == null || eventConfigurationContainer.getEventConfiguration() == null)
return EventConfiguration.defaultConfiguration();
return eventConfigurationContainer.getEventConfiguration();
}

@Bean(name = "innerEventKeyFactory")
EventKeyFactory innerEventKeyFactory() {
return new DefaultEventKeyFactory();
}

@Primary
@Bean(name = "eventKeyFactory")
EventKeyFactory eventKeyFactory(
@Named("innerEventKeyFactory") final EventKeyFactory eventKeyFactory,
final EventConfiguration eventConfiguration) {
if(eventConfiguration.getMaximumCachedKeys() <= 0) {
return eventKeyFactory;
}
return new CachingEventKeyFactory(eventKeyFactory, eventConfiguration);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.event;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class CachingEventKeyFactoryTest {
private static final int CACHE_SIZE = 2;
@Mock
private EventKeyFactory innerEventKeyFactory;

@Mock
private EventConfiguration eventConfiguration;

@BeforeEach
void setUp() {
when(eventConfiguration.getMaximumCachedKeys()).thenReturn(CACHE_SIZE);
}

private EventKeyFactory createObjectUnderTest() {
return new CachingEventKeyFactory(innerEventKeyFactory, eventConfiguration);
}

@ParameterizedTest
@EnumSource(EventKeyFactory.EventAction.class)
void createEventKey_with_EventAction_returns_inner_createEventKey(final EventKeyFactory.EventAction eventAction) {
final String key = UUID.randomUUID().toString();
final EventKey eventKey = mock(EventKey.class);

when(innerEventKeyFactory.createEventKey(key, eventAction)).thenReturn(eventKey);

final EventKey actualEventKey = createObjectUnderTest().createEventKey(key, eventAction);
assertThat(actualEventKey, sameInstance(eventKey));
}

@Test
void createEventKey_returns_inner_createEventKey() {
final String key = UUID.randomUUID().toString();
final EventKey eventKey = mock(EventKey.class);

when(innerEventKeyFactory.createEventKey(key, EventKeyFactory.EventAction.ALL)).thenReturn(eventKey);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this test testing? Since we are returning via when it will be same instance, right? Isn't it better to use the real createEventKey()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this test is verifying the behavior of createEventKey(String) in the CachingEventKeyFactory. The interface has a default implementation for createEventKey(final String key):

default EventKey createEventKey(final String key) {
return createEventKey(key, EventAction.ALL);
}

The CachingEventKeyFactory actually uses this default implementation, which then calls the custom implementation for caching.

The real implementations are being used in this test. Please note that I am not using a spy on the object under test. On this line, I have mocked the inner object. This is the same as on line 55 above. The difference is that because of the default implementation, the call comes in providing EventAction.ALL.

final EventKey actualEventKey = createObjectUnderTest().createEventKey(key);
assertThat(actualEventKey, sameInstance(eventKey));
}

@ParameterizedTest
@EnumSource(EventKeyFactory.EventAction.class)
void createEventKey_with_EventAction_returns_same_instance_without_calling_inner_createEventKey_for_same_key(final EventKeyFactory.EventAction eventAction) {
final String key = UUID.randomUUID().toString();
final EventKey eventKey = mock(EventKey.class);

when(innerEventKeyFactory.createEventKey(key, eventAction)).thenReturn(eventKey);

final EventKeyFactory objectUnderTest = createObjectUnderTest();
final EventKey actualKey = objectUnderTest.createEventKey(key, eventAction);
final EventKey actualKey2 = objectUnderTest.createEventKey(key, eventAction);

assertThat(actualKey, sameInstance(eventKey));
assertThat(actualKey2, sameInstance(eventKey));

verify(innerEventKeyFactory).createEventKey(key, eventAction);
}

@Test
void createEventKey_returns_same_instance_without_calling_inner_createEventKey_for_same_key() {
final String key = UUID.randomUUID().toString();
final EventKey eventKey = mock(EventKey.class);

when(innerEventKeyFactory.createEventKey(key, EventKeyFactory.EventAction.ALL)).thenReturn(eventKey);

final EventKeyFactory objectUnderTest = createObjectUnderTest();
final EventKey actualKey = objectUnderTest.createEventKey(key);
final EventKey actualKey2 = objectUnderTest.createEventKey(key);

assertThat(actualKey, sameInstance(eventKey));
assertThat(actualKey2, sameInstance(eventKey));

verify(innerEventKeyFactory).createEventKey(key, EventKeyFactory.EventAction.ALL);
}

@Test
void createEventKey_with_EventAction_returns_different_values_for_different_keys() {
final String key1 = UUID.randomUUID().toString();
final String key2 = UUID.randomUUID().toString();
final EventKey eventKey1 = mock(EventKey.class);
final EventKey eventKey2 = mock(EventKey.class);

when(innerEventKeyFactory.createEventKey(key1, EventKeyFactory.EventAction.ALL)).thenReturn(eventKey1);
when(innerEventKeyFactory.createEventKey(key2, EventKeyFactory.EventAction.ALL)).thenReturn(eventKey2);

final EventKeyFactory objectUnderTest = createObjectUnderTest();
final EventKey actualEventKey1 = objectUnderTest.createEventKey(key1, EventKeyFactory.EventAction.ALL);
assertThat(actualEventKey1, sameInstance(eventKey1));
final EventKey actualEventKey2 = objectUnderTest.createEventKey(key2, EventKeyFactory.EventAction.ALL);
assertThat(actualEventKey2, sameInstance(eventKey2));
}

@Test
void createEventKey_with_EventAction_returns_different_values_for_different_actions() {
final String key = UUID.randomUUID().toString();
final EventKey eventKeyGet = mock(EventKey.class);
final EventKey eventKeyPut = mock(EventKey.class);

when(innerEventKeyFactory.createEventKey(key, EventKeyFactory.EventAction.GET)).thenReturn(eventKeyGet);
when(innerEventKeyFactory.createEventKey(key, EventKeyFactory.EventAction.PUT)).thenReturn(eventKeyPut);

final EventKeyFactory objectUnderTest = createObjectUnderTest();
final EventKey actualEventKeyGet = objectUnderTest.createEventKey(key, EventKeyFactory.EventAction.GET);
assertThat(actualEventKeyGet, sameInstance(eventKeyGet));
final EventKey actualEventKeyPut = objectUnderTest.createEventKey(key, EventKeyFactory.EventAction.PUT);
assertThat(actualEventKeyPut, sameInstance(eventKeyPut));
}

@Test
void createEventKey_expires_after_reaching_maximum() {

final List<String> keys = new ArrayList<>(CACHE_SIZE);
for (int i = 0; i < CACHE_SIZE * 2; i++) {
final String key = UUID.randomUUID().toString();
final EventKey eventKey = mock(EventKey.class);
when(innerEventKeyFactory.createEventKey(key, EventKeyFactory.EventAction.ALL)).thenReturn(eventKey);
keys.add(key);
}

final EventKeyFactory objectUnderTest = createObjectUnderTest();

final int numberOfIterations = 20;
for (int i = 0; i < numberOfIterations; i++) {
for (final String key : keys) {
objectUnderTest.createEventKey(key);
}
}

verify(innerEventKeyFactory, atLeast(CACHE_SIZE + 1))
.createEventKey(anyString(), eq(EventKeyFactory.EventAction.ALL));
}
}
Loading
Loading