Skip to content

Commit

Permalink
FMWK-459 Fix requiring id property from entities by AerospikeCache (#752
Browse files Browse the repository at this point in the history
)

* id is not required for entities in cached methods
* use hashCode userKey instead of String in AerospikeCache, add tests
* make serialization and hashing methods overridable
* create AerospikeCacheKeyProcessor bean to enable overriding it with a different implementation if needed
  • Loading branch information
agrgr authored Jun 20, 2024
1 parent 4f1c727 commit 2d2af38
Show file tree
Hide file tree
Showing 13 changed files with 560 additions and 72 deletions.
28 changes: 26 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
<logback.test>1.5.6</logback.test>
<hibernate.validator>8.0.1.Final</hibernate.validator>
<netty.version>4.1.110.Final</netty.version>
<kryo.version>5.6.0</kryo.version>
<apacheCommonsCodec.version>1.17.0</apacheCommonsCodec.version>
</properties>

<licenses>
Expand Down Expand Up @@ -210,6 +212,16 @@
<artifactId>lombok</artifactId>
<version>${lombok}</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>${kryo.version}</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>${apacheCommonsCodec.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -243,6 +255,14 @@
<artifactId>aerospike-reactor-client</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
Expand Down Expand Up @@ -392,9 +412,13 @@
<include>**/*Test.java</include>
<include>**/*Tests.java</include>
</includes>
<argLine>--add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.math=ALL-UNNAMED
<argLine>
--add-opens java.base/java.util=ALL-UNNAMED
--add-opens java.base/java.net=ALL-UNNAMED
--add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED</argLine>
--add-opens java.base/java.math=ALL-UNNAMED
--add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens java.base/java.lang.reflect=ALL-UNNAMED
</argLine>
</configuration>
</plugin>
</plugins>
Expand Down
12 changes: 8 additions & 4 deletions src/main/asciidoc/reference/caching.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,17 @@ public class AerospikeConfiguration {
public AerospikeClient aerospikeClient() {
ClientPolicy clientPolicy = new ClientPolicy();
clientPolicy.failIfNotConnected = true;
return new AerospikeClient(clientPolicy, aerospikeConfigurationProperties.getHost(), aerospikeConfigurationProperties.getPort());
return new AerospikeClient(clientPolicy, aerospikeConfigurationProperties.getHost(),
aerospikeConfigurationProperties.getPort());
}
@Bean
public AerospikeCacheManager cacheManager(AerospikeClient aerospikeClient) {
public CacheManager cacheManager(IAerospikeClient aerospikeClient,
MappingAerospikeConverter aerospikeConverter,
AerospikeCacheKeyProcessor cacheKeyProcessor) {
AerospikeCacheConfiguration defaultConfiguration = new AerospikeCacheConfiguration("test");
return new AerospikeCacheManager(aerospikeClient, mappingAerospikeConverter, defaultConfiguration);
return new AerospikeCacheManager(aerospikeClient, mappingAerospikeConverter, defaultConfiguration,
cacheKeyProcessor);
}
}
----
Expand All @@ -96,7 +100,7 @@ The heart of the cache layer, to define an AerospikeCacheManager you need:
. aerospikeConverter (MappingAerospikeConverter).
. defaultCacheConfiguration (AerospikeCacheConfiguration), a default cache configuration that applies when creating new caches.
Cache configuration contains a namespace, a set (null by default meaning write directly to the namespace w/o specifying a set) and an expirationInSeconds (AKA TTL, default is 0 meaning use Aerospike server’s default).
. Optional: initalPerCacheConfiguration (Map<String, AerospikeCacheConfiguration>), You can also specify a map of cache names and matching configuration, it will create the caches with the given matching configuration at the application startup.
. Optional: initialPerCacheConfiguration (Map<String, AerospikeCacheConfiguration>), You can also specify a map of cache names and matching configuration, it will create the caches with the given matching configuration at the application startup.

NOTE: A cache name is only a link to the cache configuration.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ public class AerospikeCache implements Cache {
private final AerospikeCacheConfiguration cacheConfiguration;
private final WritePolicy createOnly;
private final WritePolicy writePolicyForPut;
private final AerospikeCacheKeyProcessor cacheKeyProcessor;

public AerospikeCache(String name,
IAerospikeClient client,
AerospikeConverter aerospikeConverter,
AerospikeCacheConfiguration cacheConfiguration) {
AerospikeCacheConfiguration cacheConfiguration,
AerospikeCacheKeyProcessor cacheKeyProcessor) {
this.name = name;
this.client = client;
this.aerospikeConverter = aerospikeConverter;
Expand All @@ -62,6 +64,7 @@ public AerospikeCache(String name,
this.writePolicyForPut = WritePolicyBuilder.builder(client.getWritePolicyDefault())
.expiration(cacheConfiguration.getExpirationInSeconds())
.build();
this.cacheKeyProcessor = cacheKeyProcessor;
}

/**
Expand Down Expand Up @@ -225,12 +228,16 @@ public ValueWrapper putIfAbsent(Object key, Object value) {
}

private Key getKey(Object key) {
return new Key(cacheConfiguration.getNamespace(), cacheConfiguration.getSet(), key.toString());
AerospikeCacheKey cacheKey = cacheKeyProcessor.serializeAndHash(key);
return new Key(cacheConfiguration.getNamespace(), cacheConfiguration.getSet(),
cacheKey.getValue());
}

private void serializeAndPut(WritePolicy writePolicy, Object key, Object value) {
AerospikeWriteData data = AerospikeWriteData.forWrite(getKey(key).namespace);
AerospikeWriteData data = AerospikeWriteData.forWrite(cacheConfiguration.getNamespace());
Key aerospikeKey = getKey(key);
data.setKey(aerospikeKey); // Set the key on the data object
aerospikeConverter.write(value, data);
client.put(writePolicy, getKey(key), data.getBinsAsArray());
client.put(writePolicy, aerospikeKey, data.getBinsAsArray());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.springframework.data.aerospike.cache;

import com.aerospike.client.Value;
import lombok.Getter;

/**
* Wrapper class used in caching. Receives hash of the cache key as a String, a long number or a byte array.
*/
public class AerospikeCacheKey {

@Getter
private final Value value;

private AerospikeCacheKey(String string) {
this.value = new Value.StringValue(string);
}

private AerospikeCacheKey(long number) {
this.value = new Value.LongValue(number);
}

private AerospikeCacheKey(byte[] data) {
this.value = new Value.BytesValue(data);
}

/**
* Instantiate AerospikeCacheKey instance with a String.
*
* @param string String parameter
* @return new instance of AerospikeCacheKey initialized with the given parameter
*/
public static AerospikeCacheKey of(String string) {
return new AerospikeCacheKey(string);
}

/**
* Instantiate AerospikeCacheKey instance with a long number.
*
* @param number long number
* @return new instance of AerospikeCacheKey initialized with the given parameter
*/
public static AerospikeCacheKey of(long number) {
return new AerospikeCacheKey(number);
}

/**
* Instantiate AerospikeCacheKey instance with a byte array.
*
* @param data byte array
* @return new instance of AerospikeCacheKey initialized with the given parameter
*/
public static AerospikeCacheKey of(byte[] data) {
return new AerospikeCacheKey(data);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.springframework.data.aerospike.cache;

/**
* Interface that provides methods used in caching
*/
public interface AerospikeCacheKeyProcessor {

/**
* Serialize the given key and calculate hash based on the serialization result.
*
* @param key Object to be serialized and hashed
* @return AerospikeCacheKey instantiated with either a String or a long number
*/
AerospikeCacheKey serializeAndHash(Object key);

/**
* Serialize the given key.
* <p>
* The default implementation uses Kryo.
*
* @param key Object to be serialized
* @return byte[]
*/
byte[] serialize(Object key);

/**
* Calculate hash based on the given byte array.
* <p>
* The default implementation uses 128 bit Murmur3 hashing.
*
* @param data Byte array to be hashed
* @return AerospikeCacheKey instantiated with either a String or a long number
*/
AerospikeCacheKey calculateHash(byte[] data);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.springframework.data.aerospike.cache;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.ByteBufferOutput;
import lombok.Getter;
import org.apache.commons.codec.digest.MurmurHash3;
import org.objenesis.strategy.StdInstantiatorStrategy;

import java.util.Arrays;

public class AerospikeCacheKeyProcessorImpl implements AerospikeCacheKeyProcessor {

@Getter
private final Kryo kryoInstance = new Kryo();

public AerospikeCacheKeyProcessorImpl() {
configureKryo();
}

/**
* Configuration for Kryo instance.
* <p>
* Classes of the objects to be cached can be pre-registered if required. Registering in advance is not necessary,
* however it can be done to increase serialization performance. If a class has been pre-registered, the first time
* it is encountered Kryo can just output a numeric reference to it instead of writing fully qualified class name.
*/
public void configureKryo() {
// setting to false means not requiring registration for all the classes of cached objects in advance
getKryoInstance().setRegistrationRequired(false);
getKryoInstance().setInstantiatorStrategy(new StdInstantiatorStrategy());
}

public AerospikeCacheKey serializeAndHash(Object key) {
return calculateHash(serialize(key));
}

public byte[] serialize(Object key) {
ByteBufferOutput output = new ByteBufferOutput(1024); // Initial buffer size
kryoInstance.writeClassAndObject(output, key);
output.flush();
return output.toBytes();
}

public AerospikeCacheKey calculateHash(byte[] data) {
return AerospikeCacheKey.of(Arrays.toString(MurmurHash3.hash128(data)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class AerospikeCacheManager extends AbstractTransactionSupportingCacheMan
private final AerospikeConverter aerospikeConverter;
private final AerospikeCacheConfiguration defaultCacheConfiguration;
private final Map<String, AerospikeCacheConfiguration> initialPerCacheConfiguration;
private final AerospikeCacheKeyProcessor cacheKeyProcessor;

/**
* Create a new {@link AerospikeCacheManager} instance - Specifying a default cache configuration.
Expand All @@ -55,8 +56,9 @@ public class AerospikeCacheManager extends AbstractTransactionSupportingCacheMan
*/
public AerospikeCacheManager(IAerospikeClient aerospikeClient,
AerospikeConverter aerospikeConverter,
AerospikeCacheConfiguration defaultCacheConfiguration) {
this(aerospikeClient, aerospikeConverter, defaultCacheConfiguration, new LinkedHashMap<>());
AerospikeCacheConfiguration defaultCacheConfiguration,
AerospikeCacheKeyProcessor cacheKeyProcessor) {
this(aerospikeClient, aerospikeConverter, defaultCacheConfiguration, new LinkedHashMap<>(), cacheKeyProcessor);
}

/**
Expand All @@ -71,7 +73,8 @@ public AerospikeCacheManager(IAerospikeClient aerospikeClient,
public AerospikeCacheManager(IAerospikeClient aerospikeClient,
AerospikeConverter aerospikeConverter,
AerospikeCacheConfiguration defaultCacheConfiguration,
Map<String, AerospikeCacheConfiguration> initialPerCacheConfiguration) {
Map<String, AerospikeCacheConfiguration> initialPerCacheConfiguration,
AerospikeCacheKeyProcessor cacheKeyProcessor) {
Assert.notNull(aerospikeClient, "The aerospike client must not be null");
Assert.notNull(aerospikeConverter, "The aerospike converter must not be null");
Assert.notNull(defaultCacheConfiguration, "The default cache configuration must not be null");
Expand All @@ -80,6 +83,7 @@ public AerospikeCacheManager(IAerospikeClient aerospikeClient,
this.aerospikeConverter = aerospikeConverter;
this.defaultCacheConfiguration = defaultCacheConfiguration;
this.initialPerCacheConfiguration = initialPerCacheConfiguration;
this.cacheKeyProcessor = cacheKeyProcessor;
}

@Override
Expand All @@ -105,11 +109,12 @@ protected Cache decorateCache(Cache cache) {
}

private AerospikeCache createCache(String name) {
return new AerospikeCache(name, aerospikeClient, aerospikeConverter, defaultCacheConfiguration);
return new AerospikeCache(name, aerospikeClient, aerospikeConverter, defaultCacheConfiguration,
cacheKeyProcessor);
}

private AerospikeCache createCache(String name, AerospikeCacheConfiguration cacheConfiguration) {
return new AerospikeCache(name, aerospikeClient, aerospikeConverter, cacheConfiguration);
return new AerospikeCache(name, aerospikeClient, aerospikeConverter, cacheConfiguration, cacheKeyProcessor);
}

private boolean isCacheAlreadyDecorated(Cache cache) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.data.aerospike.cache.AerospikeCacheKeyProcessor;
import org.springframework.data.aerospike.cache.AerospikeCacheKeyProcessorImpl;
import org.springframework.data.aerospike.convert.AerospikeCustomConversions;
import org.springframework.data.aerospike.convert.AerospikeTypeAliasAccessor;
import org.springframework.data.aerospike.convert.MappingAerospikeConverter;
Expand Down Expand Up @@ -73,6 +75,11 @@ public IndexesCacheHolder indexCache() {
return new IndexesCacheHolder();
}

@Bean(name = "aerospikeCacheKeyProcessor")
public AerospikeCacheKeyProcessor cacheKeyProcessor() {
return new AerospikeCacheKeyProcessorImpl();
}

@Bean(name = "mappingAerospikeConverter")
public MappingAerospikeConverter mappingAerospikeConverter(AerospikeMappingContext aerospikeMappingContext,
AerospikeTypeAliasAccessor aerospikeTypeAliasAccessor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.env.Environment;
import org.springframework.data.aerospike.cache.AerospikeCacheKeyProcessor;
import org.springframework.data.aerospike.config.BlockingTestConfig;
import org.springframework.data.aerospike.config.CommonTestConfig;
import org.springframework.data.aerospike.config.IndexedBinsAnnotationsProcessor;
Expand Down Expand Up @@ -63,6 +64,8 @@ public abstract class BaseBlockingIntegrationTests extends BaseIntegrationTests
protected Environment env;
@Autowired
protected MappingContext<BasicAerospikePersistentEntity<?>, AerospikePersistentProperty> mappingContext;
@Autowired
protected AerospikeCacheKeyProcessor cacheKeyProcessor;

protected <T> void deleteOneByOne(Collection<T> collection) {
collection.forEach(item -> template.delete(item));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ public abstract class BaseIntegrationTests {

public static final String DEFAULT_SET_NAME = "aerospike";
public static final String OVERRIDE_SET_NAME = "testSet1";
public static final String DIFFERENT_SET_NAME = "different-set";
public static final String CACHE_WITH_TTL = "CACHE-WITH-TTL";
public static final String DIFFERENT_EXISTING_CACHE = "DIFFERENT-EXISTING-CACHE";
protected static final int MILLIS_TO_NANO = 1_000_000;

@Value("${spring-data-aerospike.connection.namespace}")
Expand Down
Loading

0 comments on commit 2d2af38

Please sign in to comment.