diff --git a/src/main/java/de/idealo/kafka/deckard/proxy/BeanDefinitionRegistrar.java b/src/main/java/de/idealo/kafka/deckard/proxy/BeanDefinitionRegistrar.java index afb3eb3..b553241 100644 --- a/src/main/java/de/idealo/kafka/deckard/proxy/BeanDefinitionRegistrar.java +++ b/src/main/java/de/idealo/kafka/deckard/proxy/BeanDefinitionRegistrar.java @@ -25,8 +25,7 @@ public class BeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar { @Override public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { - getProducerClasses() - .forEach(producerClass -> registerBean(registry, producerClass)); + getProducerClasses().forEach(producerClass -> registerBean(registry, producerClass)); } private Collection> getProducerClasses() { @@ -49,15 +48,13 @@ private Class getClass(final ClassInfo classInfo) { private void registerBean(BeanDefinitionRegistry registry, Class beanClass) { String beanName = StringUtils.uncapitalize(beanClass.getSimpleName()); - GenericBeanDefinition proxyBeanDefinition = new GenericBeanDefinition(); - proxyBeanDefinition.setBeanClass(beanClass); - ConstructorArgumentValues args = new ConstructorArgumentValues(); - args.addGenericArgumentValue(this.getClass().getClassLoader()); args.addGenericArgumentValue(beanClass); - proxyBeanDefinition.setConstructorArgumentValues(args); + GenericBeanDefinition proxyBeanDefinition = new GenericBeanDefinition(); + proxyBeanDefinition.setBeanClass(beanClass); + proxyBeanDefinition.setConstructorArgumentValues(args); proxyBeanDefinition.setFactoryBeanName(ProducerProxyBeanFactory.DEFAULT_FACTORY_BEAN_NAME); proxyBeanDefinition.setFactoryMethodName("createBean"); proxyBeanDefinition.setDestroyMethodName("close"); diff --git a/src/main/java/de/idealo/kafka/deckard/proxy/ProducerInvocationHandler.java b/src/main/java/de/idealo/kafka/deckard/proxy/ProducerInvocationHandler.java index 9d159c6..2664514 100644 --- a/src/main/java/de/idealo/kafka/deckard/proxy/ProducerInvocationHandler.java +++ b/src/main/java/de/idealo/kafka/deckard/proxy/ProducerInvocationHandler.java @@ -9,24 +9,25 @@ @RequiredArgsConstructor public class ProducerInvocationHandler implements InvocationHandler { - private static final int MAX_NUMBER_OF_ARGUMENTS = 2; - private final Producer producer; @Override public Object invoke(Object proxy, Method method, Object[] args) { - if ("send".equals(method.getName())) { - if (args.length == 1) { - producer.send((V) args[0]); - } else if (args.length == MAX_NUMBER_OF_ARGUMENTS) { - producer.send((K) args[0], (V) args[1]); - } - } else if ("sendEmpty".equals(method.getName())) { - producer.sendEmpty((K) args[0]); - } else if ("close".equals(method.getName())) { - producer.close(); + switch (method.getName()) { + case "send": + if (args.length == 1) { + producer.send((V) args[0]); + } else { + producer.send((K) args[0], (V) args[1]); + } + break; + case "sendEmpty": + producer.sendEmpty((K) args[0]); + break; + case "close": + producer.close(); + break; } - return 0; } } \ No newline at end of file diff --git a/src/main/java/de/idealo/kafka/deckard/proxy/ProducerProxyBeanFactory.java b/src/main/java/de/idealo/kafka/deckard/proxy/ProducerProxyBeanFactory.java index d94b0e2..afb8e31 100644 --- a/src/main/java/de/idealo/kafka/deckard/proxy/ProducerProxyBeanFactory.java +++ b/src/main/java/de/idealo/kafka/deckard/proxy/ProducerProxyBeanFactory.java @@ -15,7 +15,6 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.util.Assert; -import org.springframework.util.StringUtils; import java.lang.reflect.Proxy; import java.util.Map; @@ -94,7 +93,7 @@ private class ProducerDefinition> { } private Serializer encryptedIfConfigured(KafkaProducer kafkaProducer, Serializer embeddedSerializer) { - if (!kafkaProducer.encryptionPassword().equals("") || !kafkaProducer.encryptionSalt().equals("")) { + if (hasText(kafkaProducer.encryptionPassword()) || hasText(kafkaProducer.encryptionSalt())) { Assert.isTrue(isValidEncryptionSetup(kafkaProducer.encryptionPassword(), kafkaProducer.encryptionSalt()), "Both password and salt have to be set."); EmbeddedValueResolver embeddedValueResolver = new EmbeddedValueResolver(configurableBeanFactory); @@ -108,17 +107,17 @@ private Serializer encryptedIfConfigured(KafkaProducer kafkaProducer, Seriali } private boolean isValidEncryptionSetup(String password, String salt) { - return !StringUtils.isEmpty(password) && !StringUtils.isEmpty(salt); + return hasText(password) && hasText(salt); } private Serializer createValueSerializerBean(KafkaProducer kafkaProducer, Map producerProperties) throws InstantiationException, IllegalAccessException { return (Serializer) retrieveValueSerializerClass(kafkaProducer) - .orElse((Class)producerProperties.get("value.serializer")).newInstance(); + .orElse((Class) producerProperties.get("value.serializer")).newInstance(); } private Serializer createKeySerializerBean(KafkaProducer kafkaProducer, Map producerProperties) throws InstantiationException, IllegalAccessException { return (Serializer) retrieveKeySerializerClass(kafkaProducer) - .orElse((Class)producerProperties.get("key.serializer")).newInstance(); + .orElse((Class) producerProperties.get("key.serializer")).newInstance(); } private String retrieveTopic(Class producerClass, final KafkaProducer kafkaProducer) { @@ -180,7 +179,7 @@ private boolean isValueSerializerDefined(final KafkaProducer kafkaProducer) { } private boolean isValueSerializerBeanDefined(final KafkaProducer kafkaProducer) { - return nonNull(kafkaProducer) && !kafkaProducer.valueSerializerBean().equals(""); + return nonNull(kafkaProducer) && hasText(kafkaProducer.valueSerializerBean()); } private boolean keySerializerDefined(final KafkaProducer kafkaProducer) { @@ -188,7 +187,7 @@ private boolean keySerializerDefined(final KafkaProducer kafkaProducer) { } private boolean isKeySerializerBeanDefined(final KafkaProducer kafkaProducer) { - return nonNull(kafkaProducer) && !kafkaProducer.keySerializerBean().equals(""); + return nonNull(kafkaProducer) && hasText(kafkaProducer.keySerializerBean()); } } } diff --git a/src/main/java/de/idealo/kafka/deckard/stereotype/KafkaProducer.java b/src/main/java/de/idealo/kafka/deckard/stereotype/KafkaProducer.java index d5586e2..2b19af6 100644 --- a/src/main/java/de/idealo/kafka/deckard/stereotype/KafkaProducer.java +++ b/src/main/java/de/idealo/kafka/deckard/stereotype/KafkaProducer.java @@ -9,7 +9,7 @@ @Target(ElementType.TYPE) public @interface KafkaProducer { - String id() default ""; + String id() default ""; String topic(); diff --git a/src/test/java/de/idealo/kafka/deckard/proxy/CustomBootstrapServersIT.java b/src/test/java/de/idealo/kafka/deckard/proxy/CustomBootstrapServersIT.java index c2e8ffa..e2f26e6 100644 --- a/src/test/java/de/idealo/kafka/deckard/proxy/CustomBootstrapServersIT.java +++ b/src/test/java/de/idealo/kafka/deckard/proxy/CustomBootstrapServersIT.java @@ -74,7 +74,7 @@ public void setUp() { void shouldUseBootstrapServersFromAnnotation() { customProducer.send(23L, 42); - ConsumerRecords records = customConsumer.poll(100); + ConsumerRecords records = customConsumer.poll(Duration.ofMillis(1000)); assertThat(records).hasSize(1); stream(records.spliterator(), false).forEach(record -> { assertThat(record.key()).isEqualTo(23L);