Skip to content

Commit

Permalink
Merge pull request #8 from idealo/code_improvements
Browse files Browse the repository at this point in the history
[Scouting] Improve readability and efficiency
  • Loading branch information
marcus-j authored Oct 23, 2024
2 parents 04aaaf2 + b017953 commit 8f559c0
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ public class BeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar {

@Override
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
getProducerClasses()
.forEach(producerClass -> registerBean(registry, producerClass));
getProducerClasses().forEach(producerClass -> registerBean(registry, producerClass));
}

private Collection<Class<?>> getProducerClasses() {
Expand All @@ -49,15 +48,13 @@ private Class<?> getClass(final ClassInfo classInfo) {
private void registerBean(BeanDefinitionRegistry registry, Class<?> beanClass) {
String beanName = StringUtils.uncapitalize(beanClass.getSimpleName());

GenericBeanDefinition proxyBeanDefinition = new GenericBeanDefinition();
proxyBeanDefinition.setBeanClass(beanClass);

ConstructorArgumentValues args = new ConstructorArgumentValues();

args.addGenericArgumentValue(this.getClass().getClassLoader());
args.addGenericArgumentValue(beanClass);
proxyBeanDefinition.setConstructorArgumentValues(args);

GenericBeanDefinition proxyBeanDefinition = new GenericBeanDefinition();
proxyBeanDefinition.setBeanClass(beanClass);
proxyBeanDefinition.setConstructorArgumentValues(args);
proxyBeanDefinition.setFactoryBeanName(ProducerProxyBeanFactory.DEFAULT_FACTORY_BEAN_NAME);
proxyBeanDefinition.setFactoryMethodName("createBean");
proxyBeanDefinition.setDestroyMethodName("close");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,25 @@
@RequiredArgsConstructor
public class ProducerInvocationHandler<K, V> implements InvocationHandler {

private static final int MAX_NUMBER_OF_ARGUMENTS = 2;

private final Producer<K, V> producer;

@Override
public Object invoke(Object proxy, Method method, Object[] args) {
if ("send".equals(method.getName())) {
if (args.length == 1) {
producer.send((V) args[0]);
} else if (args.length == MAX_NUMBER_OF_ARGUMENTS) {
producer.send((K) args[0], (V) args[1]);
}
} else if ("sendEmpty".equals(method.getName())) {
producer.sendEmpty((K) args[0]);
} else if ("close".equals(method.getName())) {
producer.close();
switch (method.getName()) {
case "send":
if (args.length == 1) {
producer.send((V) args[0]);
} else {
producer.send((K) args[0], (V) args[1]);
}
break;
case "sendEmpty":
producer.sendEmpty((K) args[0]);
break;
case "close":
producer.close();
break;
}

return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.lang.reflect.Proxy;
import java.util.Map;
Expand Down Expand Up @@ -94,7 +93,7 @@ private class ProducerDefinition<K, V, T extends GenericProducer<K, V>> {
}

private Serializer<V> encryptedIfConfigured(KafkaProducer kafkaProducer, Serializer<V> embeddedSerializer) {
if (!kafkaProducer.encryptionPassword().equals("") || !kafkaProducer.encryptionSalt().equals("")) {
if (hasText(kafkaProducer.encryptionPassword()) || hasText(kafkaProducer.encryptionSalt())) {
Assert.isTrue(isValidEncryptionSetup(kafkaProducer.encryptionPassword(), kafkaProducer.encryptionSalt()),
"Both password and salt have to be set.");
EmbeddedValueResolver embeddedValueResolver = new EmbeddedValueResolver(configurableBeanFactory);
Expand All @@ -108,17 +107,17 @@ private Serializer<V> encryptedIfConfigured(KafkaProducer kafkaProducer, Seriali
}

private boolean isValidEncryptionSetup(String password, String salt) {
return !StringUtils.isEmpty(password) && !StringUtils.isEmpty(salt);
return hasText(password) && hasText(salt);
}

private Serializer<V> createValueSerializerBean(KafkaProducer kafkaProducer, Map<String, Object> producerProperties) throws InstantiationException, IllegalAccessException {
return (Serializer<V>) retrieveValueSerializerClass(kafkaProducer)
.orElse((Class)producerProperties.get("value.serializer")).newInstance();
.orElse((Class) producerProperties.get("value.serializer")).newInstance();
}

private Serializer<K> createKeySerializerBean(KafkaProducer kafkaProducer, Map<String, Object> producerProperties) throws InstantiationException, IllegalAccessException {
return (Serializer<K>) retrieveKeySerializerClass(kafkaProducer)
.orElse((Class)producerProperties.get("key.serializer")).newInstance();
.orElse((Class) producerProperties.get("key.serializer")).newInstance();
}

private String retrieveTopic(Class<T> producerClass, final KafkaProducer kafkaProducer) {
Expand Down Expand Up @@ -180,15 +179,15 @@ private boolean isValueSerializerDefined(final KafkaProducer kafkaProducer) {
}

private boolean isValueSerializerBeanDefined(final KafkaProducer kafkaProducer) {
return nonNull(kafkaProducer) && !kafkaProducer.valueSerializerBean().equals("");
return nonNull(kafkaProducer) && hasText(kafkaProducer.valueSerializerBean());
}

private boolean keySerializerDefined(final KafkaProducer kafkaProducer) {
return nonNull(kafkaProducer) && !kafkaProducer.keySerializer().equals(KafkaProducer.DefaultSerializer.class);
}

private boolean isKeySerializerBeanDefined(final KafkaProducer kafkaProducer) {
return nonNull(kafkaProducer) && !kafkaProducer.keySerializerBean().equals("");
return nonNull(kafkaProducer) && hasText(kafkaProducer.keySerializerBean());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
@Target(ElementType.TYPE)
public @interface KafkaProducer {

String id() default "";
String id() default "";

String topic();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void setUp() {
void shouldUseBootstrapServersFromAnnotation() {
customProducer.send(23L, 42);

ConsumerRecords<Long, Integer> records = customConsumer.poll(100);
ConsumerRecords<Long, Integer> records = customConsumer.poll(Duration.ofMillis(1000));
assertThat(records).hasSize(1);
stream(records.spliterator(), false).forEach(record -> {
assertThat(record.key()).isEqualTo(23L);
Expand Down

0 comments on commit 8f559c0

Please sign in to comment.