diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfig.java index ce413f1c10..b122e73d23 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfig.java @@ -19,13 +19,6 @@ public class PlainTextAuthConfig { @JsonProperty("password") private String password; - @JsonProperty("security_protocol") - private String securityProtocol; - - public String getSecurityProtocol() { - return securityProtocol; - } - public String getUsername() { return username; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceBufferAccumulator.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceBufferAccumulator.java deleted file mode 100644 index 826bc39d61..0000000000 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceBufferAccumulator.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.kafka.source; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.micrometer.core.instrument.Counter; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.log.JacksonLog; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; -import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - - -/** - * * A helper utility class which helps to write different formats of records - * like json, avro and plaintext to the buffer. - */ -@SuppressWarnings("deprecation") -public class KafkaSourceBufferAccumulator { - - private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceBufferAccumulator.class); - private static final String MESSAGE_KEY = "message"; - private final TopicConfig topicConfig; - private final KafkaSourceConfig kafkaSourceConfig; - private final String schemaType; - private PluginMetrics pluginMetrics; - private final Counter kafkaConsumerWriteError; - private static final String KAFKA_CONSUMER_BUFFER_WRITE_ERROR = "kafkaConsumerBufferWriteError"; - private static final int MAX_FLUSH_RETRIES_ON_IO_EXCEPTION = Integer.MAX_VALUE; - private static final Duration INITIAL_FLUSH_RETRY_DELAY_ON_IO_EXCEPTION = Duration.ofSeconds(5); - private final JsonFactory jsonFactory = new JsonFactory(); - private static final ObjectMapper objectMapper = new ObjectMapper(); - private static final Long COMMIT_OFFSET_INTERVAL_MILLI_SEC = 300000L; - - public KafkaSourceBufferAccumulator(final TopicConfig topicConfigs, - final KafkaSourceConfig kafkaSourceConfig, - final String schemaType, PluginMetrics pluginMetric) { - this.kafkaSourceConfig = kafkaSourceConfig; - this.topicConfig = topicConfigs; - this.schemaType = schemaType; - this.pluginMetrics = pluginMetric; - this.kafkaConsumerWriteError = pluginMetrics.counter(KAFKA_CONSUMER_BUFFER_WRITE_ERROR); - } - - public Record getEventRecord(final String line) { - Map message = new HashMap<>(); - MessageFormat format = MessageFormat.getByMessageFormatByName(schemaType); - if (format.equals(MessageFormat.JSON) || format.equals(MessageFormat.AVRO)) { - try { - final JsonParser jsonParser = jsonFactory.createParser(line); - message = objectMapper.readValue(jsonParser, Map.class); - } catch (Exception e) { - LOG.error("Unable to parse json data [{}]", line, e); - message.put(MESSAGE_KEY, line); - } - } else{ - message.put(MESSAGE_KEY, line); - } - Event event = JacksonLog.builder().withData(message).build(); - return new Record<>(event); - } - - public void write(List> kafkaRecords, final Buffer> buffer) throws Exception { - try { - writeAllRecordToBuffer(kafkaRecords, - buffer, topicConfig); - } catch (Exception e) { - if (canRetry(e)) { - writeWithBackoff(kafkaRecords, buffer, topicConfig); - } - LOG.error("Error occurred while writing data to the buffer {}", e.getMessage()); - kafkaConsumerWriteError.increment(); - } - } - - public synchronized void writeAllRecordToBuffer(List> kafkaRecords, final Buffer> buffer, final TopicConfig topicConfig) throws Exception { - buffer.writeAll(kafkaRecords, - topicConfig.getBufferDefaultTimeout().toSecondsPart()); - } - - public boolean canRetry(final Exception e) { - return (e instanceof IOException || e instanceof TimeoutException || e instanceof ExecutionException - || e instanceof InterruptedException); - } - - public boolean writeWithBackoff(List> kafkaRecords, final Buffer> buffer, final TopicConfig topicConfig) throws Exception { - final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - long nextDelay = INITIAL_FLUSH_RETRY_DELAY_ON_IO_EXCEPTION.toMillis(); - boolean flushedSuccessfully; - - for (int retryCount = 0; retryCount < MAX_FLUSH_RETRIES_ON_IO_EXCEPTION; retryCount++) { - final ScheduledFuture flushBufferFuture = scheduledExecutorService.schedule(() -> { - try { - writeAllRecordToBuffer(kafkaRecords, buffer, topicConfig); - return true; - } catch (final TimeoutException e) { - return false; - } - }, nextDelay, TimeUnit.MILLISECONDS); - - try { - flushedSuccessfully = flushBufferFuture.get(); - if (flushedSuccessfully) { - LOG.info("Successfully flushed the buffer accumulator on retry attempt {}", retryCount + 1); - scheduledExecutorService.shutdownNow(); - return true; - } - } catch (final ExecutionException exp) { - LOG.warn("Retrying of flushing the buffer accumulator hit an exception: {}", exp); - scheduledExecutorService.shutdownNow(); - throw exp; - } catch (final InterruptedException exp) { - LOG.warn("Retrying of flushing the buffer accumulator was interrupted: {}", exp); - scheduledExecutorService.shutdownNow(); - throw exp; - } - } - LOG.warn("Flushing the bufferAccumulator failed after {} attempts", MAX_FLUSH_RETRIES_ON_IO_EXCEPTION); - scheduledExecutorService.shutdownNow(); - return false; - } - - public long commitOffsets(KafkaConsumer consumer, long lastCommitTime, Map offsetsToCommit) { - try { - long currentTimeMillis = System.currentTimeMillis(); - if (currentTimeMillis - lastCommitTime > COMMIT_OFFSET_INTERVAL_MILLI_SEC) { - if(!offsetsToCommit.isEmpty()) { - consumer.commitSync(offsetsToCommit); - offsetsToCommit.clear(); - LOG.info("Succeeded to commit the offsets ..."); - } - lastCommitTime = currentTimeMillis; - } - } catch (Exception e) { - LOG.error("Failed to commit the offsets...", e); - } - return lastCommitTime; - } - - public long processConsumerRecords(Map offsetsToCommit, - List> kafkaRecords, - long lastReadOffset, ConsumerRecord consumerRecord, List> partitionRecords) { - offsetsToCommit.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), - new OffsetAndMetadata(consumerRecord.offset() + 1, null)); - kafkaRecords.add(getEventRecord(consumerRecord.value())); - lastReadOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); - return lastReadOffset; - } -} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceBufferAccumulatorTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceBufferAccumulatorTest.java deleted file mode 100644 index c09e133685..0000000000 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceBufferAccumulatorTest.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.kafka.source; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - - -@SuppressWarnings("deprecation") -@ExtendWith(MockitoExtension.class) -@MockitoSettings(strictness = Strictness.LENIENT) -class KafkaSourceBufferAccumulatorTest { - - @Mock - private KafkaSourceBufferAccumulator buffer; - - @Mock - private KafkaSourceConfig sourceConfig; - - @Mock - private TopicConfig topicConfig; - @Mock - private KafkaConsumer kafkaConsumer; - - @Mock - List mockList = new ArrayList(); - @Mock - private SchemaConfig schemaConfig; - - @Mock - private PluginMetrics pluginMetrics; - - @Mock - private Buffer> record; - - @Mock - List> kafkaRecords; - - @BeforeEach - void setUp() throws Exception { - when(sourceConfig.getTopics()).thenReturn((mockList)); - when(mockList.get(0)).thenReturn(topicConfig); - when(sourceConfig.getSchemaConfig()).thenReturn(schemaConfig); - - when(sourceConfig.getSchemaConfig()).thenReturn(mock(SchemaConfig.class)); - - buffer = new KafkaSourceBufferAccumulator<>(topicConfig, sourceConfig, "plaintext", pluginMetrics); - } - - @Test - void testWriteEventOrStringToBuffer_plaintext_schemaType() throws Exception { - createObjectWithSchemaType("plaintext"); - - KafkaSourceBufferAccumulator spyBuffer = spy(buffer); - doCallRealMethod().when(spyBuffer).getEventRecord("anyString"); - spyBuffer.getEventRecord("anyString"); - verify(spyBuffer).getEventRecord("anyString"); - assertNotNull(spyBuffer.getEventRecord("anyString")); - } - - @Test - void testWriteEventOrStringToBuffer_json_schemaType() throws Exception { - String json = "{\"writebuffer\":\"true\",\"buffertype\":\"json\"}"; - createObjectWithSchemaType("json"); //Added By Mehak - - KafkaSourceBufferAccumulator spyBuffer = spy(buffer); - doCallRealMethod().when(spyBuffer).getEventRecord(json); - spyBuffer.getEventRecord(json); - verify(spyBuffer).getEventRecord(json); - assertNotNull(spyBuffer.getEventRecord(json)); - } - - @Test - void testWriteEventOrStringToBuffer_json_schemaType_catch_block() throws Exception { - createObjectWithSchemaType("json"); - - KafkaSourceBufferAccumulator spyBuffer = spy(buffer); - doCallRealMethod().when(spyBuffer).getEventRecord("anyString"); - spyBuffer.getEventRecord("anyString"); - verify(spyBuffer).getEventRecord("anyString"); - assertNotNull(spyBuffer.getEventRecord("anyString")); - } - - @Test - void testWriteEventOrStringToBuffer_plaintext_schemaType_catch_block() throws Exception { - createObjectWithSchemaType("plaintext"); - - KafkaSourceBufferAccumulator spyBuffer = spy(buffer); - doCallRealMethod().when(spyBuffer).getEventRecord(null); - spyBuffer.getEventRecord(null); - verify(spyBuffer).getEventRecord(null); - assertNotNull(spyBuffer.getEventRecord(null)); - } - - @Test - void testwrite()throws Exception{ - TopicConfig topicConfig = new TopicConfig(); - SchemaConfig schemaConfig = new SchemaConfig(); - topicConfig.setBufferDefaultTimeout(Duration.ofMillis(100)); - KafkaSourceBufferAccumulator spyBuffer = spy(buffer); - doCallRealMethod().when(spyBuffer).write(kafkaRecords, record); - spyBuffer.write(kafkaRecords, record); - verify(spyBuffer).write(kafkaRecords, record); - } - - private void createObjectWithSchemaType(String schema){ - - topicConfig = new TopicConfig(); - schemaConfig = new SchemaConfig(); - topicConfig.setBufferDefaultTimeout(Duration.ofMillis(100)); - sourceConfig.setSchemaConfig(schemaConfig); - } - - @Test - void testwriteWithBackoff() throws Exception { - TopicConfig topicConfig = new TopicConfig(); - Buffer> bufferObj = mock(Buffer.class); - topicConfig.setBufferDefaultTimeout(Duration.ofMillis(100)); - KafkaSourceBufferAccumulator spyBuffer = spy(buffer); - doCallRealMethod().when(spyBuffer).writeWithBackoff(kafkaRecords, bufferObj, this.topicConfig); - spyBuffer.writeWithBackoff(kafkaRecords, bufferObj, this.topicConfig); - verify(spyBuffer).writeWithBackoff(kafkaRecords, bufferObj, this.topicConfig); - } - - @Test - void testPublishRecordToBuffer_commitOffsets() throws Exception { - topicConfig = new TopicConfig(); - KafkaSourceBufferAccumulator spyBuffer = spy(buffer); - doCallRealMethod().when(spyBuffer).commitOffsets(kafkaConsumer, 0L, null); - spyBuffer.commitOffsets(kafkaConsumer, 0L, null); - verify(spyBuffer).commitOffsets(kafkaConsumer, 0L, null); - } -} diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml index eb2ed0a60f..fe3407471b 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml @@ -39,7 +39,6 @@ log-pipeline: sasl: aws_msk_iam: role plaintext: - security_protocol: SASL_SSL username: 5UH4NID4OENKDIBI password: jCmncn77F9asfox3yhgZLCEwQ5fx8pKiXnszMqdt0y1GLrdZO1V1iz95aIe1UubX oauth: