From cd373f3eae19d68beb0f76bd67fd7d3304397eba Mon Sep 17 00:00:00 2001 From: Sumit Aich Date: Tue, 6 Dec 2022 14:13:54 +0530 Subject: [PATCH] feat: redis sink using depot library (#206) * feat: redis sink using depot library * docs: redis sink using depot library * chore: version bump * fix: remove descriptors * chore: version bump * chore: version bump --- build.gradle | 4 +- docs/docs/sinks/redis-sink.md | 85 +---- .../odpf/firehose/config/RedisSinkConfig.java | 43 --- .../converter/RedisSinkDataTypeConverter.java | 13 - .../RedisSinkDeploymentTypeConverter.java | 13 - .../converter/RedisSinkTtlTypeConverter.java | 13 - .../config/enums/RedisSinkDataType.java | 7 - .../config/enums/RedisSinkDeploymentType.java | 6 - .../config/enums/RedisSinkTtlType.java | 7 - .../io/odpf/firehose/sink/SinkFactory.java | 14 +- .../odpf/firehose/sink/redis/RedisSink.java | 57 --- .../firehose/sink/redis/RedisSinkFactory.java | 51 --- .../sink/redis/client/RedisClient.java | 29 -- .../sink/redis/client/RedisClientFactory.java | 79 ---- .../sink/redis/client/RedisClusterClient.java | 55 --- .../redis/client/RedisStandaloneClient.java | 67 ---- .../sink/redis/dataentry/RedisDataEntry.java | 27 -- .../dataentry/RedisHashSetFieldEntry.java | 35 -- .../redis/dataentry/RedisKeyValueEntry.java | 48 --- .../sink/redis/dataentry/RedisListEntry.java | 33 -- .../redis/exception/NoResponseException.java | 16 - .../redis/parsers/RedisHashSetParser.java | 48 --- .../redis/parsers/RedisKeyValueParser.java | 37 -- .../sink/redis/parsers/RedisListParser.java | 48 --- .../sink/redis/parsers/RedisParser.java | 128 ------- .../redis/parsers/RedisParserFactory.java | 35 -- .../firehose/sink/redis/ttl/DurationTtl.java | 23 -- .../firehose/sink/redis/ttl/ExactTimeTtl.java | 23 -- .../firehose/sink/redis/ttl/NoRedisTtl.java | 15 - .../sink/redis/ttl/RedisTTLFactory.java | 22 -- .../firehose/sink/redis/ttl/RedisTtl.java | 13 - .../RedisSinkDataTypeConverterTest.java | 47 --- .../sink/redis/RedisSinkFactoryTest.java | 41 --- .../firehose/sink/redis/RedisSinkTest.java | 98 ----- .../redis/client/RedisClientFactoryTest.java | 118 ------ .../redis/client/RedisClusterClientTest.java | 114 ------ .../client/RedisStandaloneClientTest.java | 194 ---------- .../dataentry/RedisHashSetFieldEntryTest.java | 105 ------ .../dataentry/RedisKeyValueEntryTest.java | 100 ------ .../redis/dataentry/RedisListEntryTest.java | 94 ----- .../redis/parsers/RedisHashSetParserTest.java | 336 ------------------ .../parsers/RedisKeyValueParserTest.java | 97 ----- .../redis/parsers/RedisListParserTest.java | 107 ------ .../redis/parsers/RedisParserFactoryTest.java | 84 ----- .../sink/redis/ttl/DurationTTLTest.java | 41 --- .../sink/redis/ttl/ExactTimeTTLTest.java | 39 -- .../sink/redis/ttl/RedisTtlFactoryTest.java | 61 ---- 47 files changed, 26 insertions(+), 2744 deletions(-) delete mode 100644 src/main/java/io/odpf/firehose/config/RedisSinkConfig.java delete mode 100644 src/main/java/io/odpf/firehose/config/converter/RedisSinkDataTypeConverter.java delete mode 100644 src/main/java/io/odpf/firehose/config/converter/RedisSinkDeploymentTypeConverter.java delete mode 100644 src/main/java/io/odpf/firehose/config/converter/RedisSinkTtlTypeConverter.java delete mode 100644 src/main/java/io/odpf/firehose/config/enums/RedisSinkDataType.java delete mode 100644 src/main/java/io/odpf/firehose/config/enums/RedisSinkDeploymentType.java delete mode 100644 src/main/java/io/odpf/firehose/config/enums/RedisSinkTtlType.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/RedisSink.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/RedisSinkFactory.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/client/RedisClient.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/client/RedisClientFactory.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/client/RedisClusterClient.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/client/RedisStandaloneClient.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisDataEntry.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisHashSetFieldEntry.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisKeyValueEntry.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisListEntry.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/exception/NoResponseException.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/parsers/RedisHashSetParser.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/parsers/RedisKeyValueParser.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/parsers/RedisListParser.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/parsers/RedisParser.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/parsers/RedisParserFactory.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/ttl/DurationTtl.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/ttl/ExactTimeTtl.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/ttl/NoRedisTtl.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/ttl/RedisTTLFactory.java delete mode 100644 src/main/java/io/odpf/firehose/sink/redis/ttl/RedisTtl.java delete mode 100644 src/test/java/io/odpf/firehose/config/RedisSinkDataTypeConverterTest.java delete mode 100644 src/test/java/io/odpf/firehose/sink/redis/RedisSinkFactoryTest.java delete mode 100644 src/test/java/io/odpf/firehose/sink/redis/RedisSinkTest.java delete mode 100644 src/test/java/io/odpf/firehose/sink/redis/client/RedisClientFactoryTest.java delete mode 100644 src/test/java/io/odpf/firehose/sink/redis/client/RedisClusterClientTest.java delete mode 100644 src/test/java/io/odpf/firehose/sink/redis/client/RedisStandaloneClientTest.java delete mode 100644 src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisHashSetFieldEntryTest.java delete mode 100644 src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisKeyValueEntryTest.java delete mode 100644 src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisListEntryTest.java delete mode 100644 src/test/java/io/odpf/firehose/sink/redis/parsers/RedisHashSetParserTest.java delete mode 100644 src/test/java/io/odpf/firehose/sink/redis/parsers/RedisKeyValueParserTest.java delete mode 100644 src/test/java/io/odpf/firehose/sink/redis/parsers/RedisListParserTest.java delete mode 100644 src/test/java/io/odpf/firehose/sink/redis/parsers/RedisParserFactoryTest.java delete mode 100644 src/test/java/io/odpf/firehose/sink/redis/ttl/DurationTTLTest.java delete mode 100644 src/test/java/io/odpf/firehose/sink/redis/ttl/ExactTimeTTLTest.java delete mode 100644 src/test/java/io/odpf/firehose/sink/redis/ttl/RedisTtlFactoryTest.java diff --git a/build.gradle b/build.gradle index 129e3a3da..a9aac4646 100644 --- a/build.gradle +++ b/build.gradle @@ -33,7 +33,7 @@ lombok { } group 'io.odpf' -version '0.6.1' +version '0.7.0' def projName = "firehose" @@ -101,7 +101,7 @@ dependencies { implementation 'com.google.cloud:google-cloud-storage:1.114.0' implementation 'com.google.cloud:google-cloud-bigquery:1.115.0' implementation 'org.apache.logging.log4j:log4j-core:2.17.1' - implementation group: 'io.odpf', name: 'depot', version: '0.3.4' + implementation group: 'io.odpf', name: 'depot', version: '0.3.5' implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j' testImplementation group: 'junit', name: 'junit', version: '4.11' diff --git a/docs/docs/sinks/redis-sink.md b/docs/docs/sinks/redis-sink.md index d0ab553d7..ec9d8789c 100644 --- a/docs/docs/sinks/redis-sink.md +++ b/docs/docs/sinks/redis-sink.md @@ -1,80 +1,21 @@ -# Redis +# Redis Sink -A Redis sink Firehose \(`SINK_TYPE`=`redis`\) requires the following variables to be set along with Generic ones +Redis Sink is implemented in Firehose using the Redis sink connector implementation in ODPF Depot. You can check out ODPF Depot Github repository [here](https://github.com/odpf/depot). -### `SINK_REDIS_URLS` +### Data Types +Redis sink can be created in 3 different modes based on the value of [`SINK_REDIS_DATA_TYPE`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_data_type): HashSet, KeyValue or List +- `Hashset`: For each message, an entry of the format `key : field : value` is generated and pushed to Redis. Field and value are generated on the basis of the config [`SINK_REDIS_HASHSET_FIELD_TO_COLUMN_MAPPING`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_hashset_field_to_column_mapping) +- `List`: For each message entry of the format `key : value` is generated and pushed to Redis. Value is fetched for the Proto field name provided in the config [`SINK_REDIS_LIST_DATA_FIELD_NAME`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_list_data_field_name) +- `KeyValue`: For each message entry of the format `key : value` is generated and pushed to Redis. Value is fetched for the proto field name provided in the config [`SINK_REDIS_KEY_VALUE_DATA_FIELD_NAME`](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md#sink_redis_key_value_data_field_name) -REDIS instance hostname/IP address followed by its port. +The `key` is picked up from a field in the message itself. -- Example value: `localhos:6379,localhost:6380` -- Type: `required` +Limitation: Depot Redis sink only supports Key-Value, HashSet and List entries as of now. -### `SINK_REDIS_DATA_TYPE` +### Configuration -To select whether you want to push your data as a HashSet or as a List. +For Redis sink in Firehose we need to set first (`SINK_TYPE`=`redis`). There are some generic configs which are common across different sink types which need to be set which are mentioned in [generic.md](../advance/generic.md). Redis sink specific configs are mentioned in ODPF Depot repository. You can check out the Redis Sink configs [here](https://github.com/odpf/depot/blob/main/docs/reference/configuration/redis.md) -- Example value: `Hashset` -- Type: `required` -- Default value: `List` -### `SINK_REDIS_KEY_TEMPLATE` - -The string that will act as the key for each Redis entry. This key can be configured as per the requirement, a constant or can extract value from each message and use that as the Redis key. - -- Example value: `Service\_%%s,1` - - This will take the value with index 1 from proto and create the Redis keys as per the template\ - -- Type: `required` - -### `INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING` - -This is the field that decides what all data will be stored in the HashSet for each message. - -- Example value: `{"6":"customer_id", "2":"order_num"}` -- Type: `required (For Hashset)` - -### `SINK_REDIS_LIST_DATA_PROTO_INDEX` - -This field decides what all data will be stored in the List for each message. - -- Example value: `6` - - This will get the value of the field with index 6 in your proto and push that to the Redis list with the corresponding keyTemplate\ - -- Type: `required (For List)` - -### `SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX` - -This field decides what data will be stored in the value part of key-value pair - -- Example value: `6` - - This will get the value of the field with index 6 in your proto and push that to the Redis as value with the corresponding keyTemplate\ - -- Type: `required (For KeyValue)` - -### `SINK_REDIS_TTL_TYPE` - -- Example value: `DURATION` -- Type: `optional` -- Default value: `DISABLE` -- Choice of Redis TTL type.It can be:\ - - `DURATION`: After which the Key will be expired and removed from Redis \(UNIT- seconds\)\ - - `EXACT_TIME`: Precise UNIX timestamp after which the Key will be expired - -### `SINK_REDIS_TTL_VALUE` - -Redis TTL value in Unix Timestamp for `EXACT_TIME` TTL type, In Seconds for `DURATION` TTL type. - -- Example value: `100000` -- Type: `optional` -- Default value: `0` - -### `SINK_REDIS_DEPLOYMENT_TYPE` - -The Redis deployment you are using. At present, we support `Standalone` and `Cluster` types. - -- Example value: `Standalone` -- Type: `required` -- Default value: `Standalone` +### Deployment Types +Redis sink, as of now, supports two different Deployment Types `Standalone` and `Cluster`. This can be configured in the Depot environment variable `SINK_REDIS_DEPLOYMENT_TYPE`. diff --git a/src/main/java/io/odpf/firehose/config/RedisSinkConfig.java b/src/main/java/io/odpf/firehose/config/RedisSinkConfig.java deleted file mode 100644 index 7e6201984..000000000 --- a/src/main/java/io/odpf/firehose/config/RedisSinkConfig.java +++ /dev/null @@ -1,43 +0,0 @@ -package io.odpf.firehose.config; - -import io.odpf.firehose.config.converter.RedisSinkDataTypeConverter; -import io.odpf.firehose.config.converter.RedisSinkTtlTypeConverter; -import io.odpf.firehose.config.converter.RedisSinkDeploymentTypeConverter; -import io.odpf.firehose.config.enums.RedisSinkDataType; -import io.odpf.firehose.config.enums.RedisSinkTtlType; -import io.odpf.firehose.config.enums.RedisSinkDeploymentType; - -public interface RedisSinkConfig extends AppConfig { - @Key("SINK_REDIS_URLS") - String getSinkRedisUrls(); - - @Key("SINK_REDIS_KEY_TEMPLATE") - String getSinkRedisKeyTemplate(); - - @Key("SINK_REDIS_DATA_TYPE") - @DefaultValue("HASHSET") - @ConverterClass(RedisSinkDataTypeConverter.class) - RedisSinkDataType getSinkRedisDataType(); - - @Key("SINK_REDIS_LIST_DATA_PROTO_INDEX") - String getSinkRedisListDataProtoIndex(); - - @Key("SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX") - String getSinkRedisKeyValuetDataProtoIndex(); - - @Key("SINK_REDIS_TTL_TYPE") - @DefaultValue("DISABLE") - @ConverterClass(RedisSinkTtlTypeConverter.class) - RedisSinkTtlType getSinkRedisTtlType(); - - @Key("SINK_REDIS_TTL_VALUE") - @DefaultValue("0") - long getSinkRedisTtlValue(); - - @Key("SINK_REDIS_DEPLOYMENT_TYPE") - @DefaultValue("Standalone") - @ConverterClass(RedisSinkDeploymentTypeConverter.class) - RedisSinkDeploymentType getSinkRedisDeploymentType(); - - -} diff --git a/src/main/java/io/odpf/firehose/config/converter/RedisSinkDataTypeConverter.java b/src/main/java/io/odpf/firehose/config/converter/RedisSinkDataTypeConverter.java deleted file mode 100644 index 06b64889b..000000000 --- a/src/main/java/io/odpf/firehose/config/converter/RedisSinkDataTypeConverter.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.odpf.firehose.config.converter; - -import io.odpf.firehose.config.enums.RedisSinkDataType; -import org.aeonbits.owner.Converter; - -import java.lang.reflect.Method; - -public class RedisSinkDataTypeConverter implements Converter { - @Override - public RedisSinkDataType convert(Method method, String input) { - return RedisSinkDataType.valueOf(input.toUpperCase()); - } -} diff --git a/src/main/java/io/odpf/firehose/config/converter/RedisSinkDeploymentTypeConverter.java b/src/main/java/io/odpf/firehose/config/converter/RedisSinkDeploymentTypeConverter.java deleted file mode 100644 index fcd476ac4..000000000 --- a/src/main/java/io/odpf/firehose/config/converter/RedisSinkDeploymentTypeConverter.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.odpf.firehose.config.converter; - -import io.odpf.firehose.config.enums.RedisSinkDeploymentType; -import org.aeonbits.owner.Converter; - -import java.lang.reflect.Method; - -public class RedisSinkDeploymentTypeConverter implements Converter { - @Override - public RedisSinkDeploymentType convert(Method method, String input) { - return RedisSinkDeploymentType.valueOf(input.toUpperCase()); - } -} diff --git a/src/main/java/io/odpf/firehose/config/converter/RedisSinkTtlTypeConverter.java b/src/main/java/io/odpf/firehose/config/converter/RedisSinkTtlTypeConverter.java deleted file mode 100644 index 8ae0af293..000000000 --- a/src/main/java/io/odpf/firehose/config/converter/RedisSinkTtlTypeConverter.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.odpf.firehose.config.converter; - -import io.odpf.firehose.config.enums.RedisSinkTtlType; -import org.aeonbits.owner.Converter; - -import java.lang.reflect.Method; - -public class RedisSinkTtlTypeConverter implements Converter { - @Override - public RedisSinkTtlType convert(Method method, String input) { - return RedisSinkTtlType.valueOf(input.toUpperCase()); - } -} diff --git a/src/main/java/io/odpf/firehose/config/enums/RedisSinkDataType.java b/src/main/java/io/odpf/firehose/config/enums/RedisSinkDataType.java deleted file mode 100644 index c8fb593b3..000000000 --- a/src/main/java/io/odpf/firehose/config/enums/RedisSinkDataType.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.odpf.firehose.config.enums; - -public enum RedisSinkDataType { - LIST, - HASHSET, - KEYVALUE, -} diff --git a/src/main/java/io/odpf/firehose/config/enums/RedisSinkDeploymentType.java b/src/main/java/io/odpf/firehose/config/enums/RedisSinkDeploymentType.java deleted file mode 100644 index b2f9448aa..000000000 --- a/src/main/java/io/odpf/firehose/config/enums/RedisSinkDeploymentType.java +++ /dev/null @@ -1,6 +0,0 @@ -package io.odpf.firehose.config.enums; - -public enum RedisSinkDeploymentType { - STANDALONE, - CLUSTER -} diff --git a/src/main/java/io/odpf/firehose/config/enums/RedisSinkTtlType.java b/src/main/java/io/odpf/firehose/config/enums/RedisSinkTtlType.java deleted file mode 100644 index 0507ca9fd..000000000 --- a/src/main/java/io/odpf/firehose/config/enums/RedisSinkTtlType.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.odpf.firehose.config.enums; - -public enum RedisSinkTtlType { - EXACT_TIME, - DURATION, - DISABLE -} diff --git a/src/main/java/io/odpf/firehose/sink/SinkFactory.java b/src/main/java/io/odpf/firehose/sink/SinkFactory.java index 7029e9c19..579283d89 100644 --- a/src/main/java/io/odpf/firehose/sink/SinkFactory.java +++ b/src/main/java/io/odpf/firehose/sink/SinkFactory.java @@ -3,12 +3,15 @@ import io.odpf.depot.bigquery.BigQuerySink; import io.odpf.depot.bigquery.BigQuerySinkFactory; import io.odpf.depot.config.BigQuerySinkConfig; +import io.odpf.depot.config.RedisSinkConfig; import io.odpf.depot.bigtable.BigTableSinkFactory; import io.odpf.depot.bigtable.BigTableSink; import io.odpf.depot.config.BigTableSinkConfig; import io.odpf.depot.log.LogSink; import io.odpf.depot.log.LogSinkFactory; import io.odpf.depot.metrics.StatsDReporter; +import io.odpf.depot.redis.RedisSink; +import io.odpf.depot.redis.RedisSinkFactory; import io.odpf.firehose.config.KafkaConsumerConfig; import io.odpf.firehose.config.enums.SinkType; import io.odpf.firehose.consumer.kafka.OffsetManager; @@ -23,7 +26,6 @@ import io.odpf.firehose.sink.jdbc.JdbcSinkFactory; import io.odpf.firehose.sink.mongodb.MongoSinkFactory; import io.odpf.firehose.sink.prometheus.PromSinkFactory; -import io.odpf.firehose.sink.redis.RedisSinkFactory; import io.odpf.stencil.client.StencilClient; import org.aeonbits.owner.ConfigFactory; @@ -38,6 +40,7 @@ public class SinkFactory { private BigQuerySinkFactory bigQuerySinkFactory; private BigTableSinkFactory bigTableSinkFactory; private LogSinkFactory logSinkFactory; + private RedisSinkFactory redisSinkFactory; private final Map config; public SinkFactory(KafkaConsumerConfig kafkaConsumerConfig, @@ -61,7 +64,6 @@ public void init() { case HTTP: case INFLUXDB: case ELASTICSEARCH: - case REDIS: case GRPC: case PROMETHEUS: case BLOB: @@ -71,6 +73,12 @@ public void init() { logSinkFactory = new LogSinkFactory(config, statsDReporter); logSinkFactory.init(); return; + case REDIS: + redisSinkFactory = new RedisSinkFactory( + ConfigFactory.create(RedisSinkConfig.class, config), + statsDReporter); + redisSinkFactory.init(); + return; case BIGQUERY: BigquerySinkUtils.addMetadataColumns(config); bigQuerySinkFactory = new BigQuerySinkFactory( @@ -105,7 +113,7 @@ public Sink getSink() { case ELASTICSEARCH: return EsSinkFactory.create(config, statsDReporter, stencilClient); case REDIS: - return RedisSinkFactory.create(config, statsDReporter, stencilClient); + return new GenericOdpfSink(new FirehoseInstrumentation(statsDReporter, RedisSink.class), sinkType.name(), redisSinkFactory.create()); case GRPC: return GrpcSinkFactory.create(config, statsDReporter, stencilClient); case PROMETHEUS: diff --git a/src/main/java/io/odpf/firehose/sink/redis/RedisSink.java b/src/main/java/io/odpf/firehose/sink/redis/RedisSink.java deleted file mode 100644 index 808631c1a..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/RedisSink.java +++ /dev/null @@ -1,57 +0,0 @@ -package io.odpf.firehose.sink.redis; - -import io.odpf.firehose.message.Message; -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.sink.AbstractSink; -import io.odpf.firehose.sink.redis.client.RedisClient; -import io.odpf.firehose.sink.redis.exception.NoResponseException; - -import java.util.List; - -/** - * RedisSink allows messages consumed from kafka to be persisted to a redis. - * The related configurations for RedisSink can be found here: {@see io.odpf.firehose.config.RedisSinkConfig} - */ -public class RedisSink extends AbstractSink { - - private RedisClient redisClient; - - /** - * Instantiates a new Redis sink. - * - * @param firehoseInstrumentation the instrumentation - * @param sinkType the sink type - * @param redisClient the redis client - */ - public RedisSink(FirehoseInstrumentation firehoseInstrumentation, String sinkType, RedisClient redisClient) { - super(firehoseInstrumentation, sinkType); - this.redisClient = redisClient; - } - - /** - * process messages before sending to redis. - * - * @param messages the messages - */ - @Override - protected void prepare(List messages) { - redisClient.prepare(messages); - } - - /** - * Send data to redis. - * - * @return the list - * @throws NoResponseException the no response exception - */ - @Override - protected List execute() throws NoResponseException { - return redisClient.execute(); - } - - @Override - public void close() { - getFirehoseInstrumentation().logInfo("Redis connection closing"); - redisClient.close(); - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/RedisSinkFactory.java b/src/main/java/io/odpf/firehose/sink/redis/RedisSinkFactory.java deleted file mode 100644 index 741fe695d..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/RedisSinkFactory.java +++ /dev/null @@ -1,51 +0,0 @@ -package io.odpf.firehose.sink.redis; - - -import io.odpf.depot.metrics.StatsDReporter; -import io.odpf.firehose.config.RedisSinkConfig; -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.sink.AbstractSink; -import io.odpf.firehose.sink.redis.client.RedisClient; -import io.odpf.firehose.sink.redis.client.RedisClientFactory; -import io.odpf.stencil.client.StencilClient; -import org.aeonbits.owner.ConfigFactory; - -import java.util.Map; - -/** - * Factory class to create the RedisSink. - *

- * The firehose would reflectively instantiate this factory - * using the configurations supplied and invoke {@see #create(Map < String, String > configuration, StatsDClient statsDReporter, StencilClient client)} - * to obtain the RedisSink implementation. - */ -public class RedisSinkFactory { - - /** - * Creates Redis sink. - * - * @param configuration the configuration - * @param statsDReporter the stats d reporter - * @param stencilClient the stencil client - * @return the abstract sink - */ - public static AbstractSink create(Map configuration, StatsDReporter statsDReporter, StencilClient stencilClient) { - RedisSinkConfig redisSinkConfig = ConfigFactory.create(RedisSinkConfig.class, configuration); - FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, RedisSinkFactory.class); - String redisConfig = String.format("\n\tredis.urls = %s\n\tredis.key.template = %s\n\tredis.sink.type = %s" - + "\n\tredis.list.data.proto.index = %s\n\tredis.ttl.type = %s\n\tredis.ttl.value = %d", - redisSinkConfig.getSinkRedisUrls(), - redisSinkConfig.getSinkRedisKeyTemplate(), - redisSinkConfig.getSinkRedisDataType().toString(), - redisSinkConfig.getSinkRedisListDataProtoIndex(), - redisSinkConfig.getSinkRedisTtlType().toString(), - redisSinkConfig.getSinkRedisTtlValue()); - firehoseInstrumentation.logDebug(redisConfig); - firehoseInstrumentation.logInfo("Redis server type = {}", redisSinkConfig.getSinkRedisDeploymentType()); - - RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient); - RedisClient client = redisClientFactory.getClient(); - firehoseInstrumentation.logInfo("Connection to redis established successfully"); - return new RedisSink(new FirehoseInstrumentation(statsDReporter, RedisSink.class), "redis", client); - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/client/RedisClient.java b/src/main/java/io/odpf/firehose/sink/redis/client/RedisClient.java deleted file mode 100644 index 66b663734..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/client/RedisClient.java +++ /dev/null @@ -1,29 +0,0 @@ -package io.odpf.firehose.sink.redis.client; - -import io.odpf.firehose.message.Message; - -import java.util.List; - -/** - * Redis client interface to be used in RedisSink. - */ -public interface RedisClient { - /** - * Process messages before sending. - * - * @param messages the messages - */ - void prepare(List messages); - - /** - * Sends the processed messages to redis. - * - * @return list of messages - */ - List execute(); - - /** - * Close the client. - */ - void close(); -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/client/RedisClientFactory.java b/src/main/java/io/odpf/firehose/sink/redis/client/RedisClientFactory.java deleted file mode 100644 index 7d3fac23e..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/client/RedisClientFactory.java +++ /dev/null @@ -1,79 +0,0 @@ -package io.odpf.firehose.sink.redis.client; - -import io.odpf.depot.metrics.StatsDReporter; -import io.odpf.firehose.config.RedisSinkConfig; -import io.odpf.firehose.config.enums.RedisSinkDeploymentType; -import io.odpf.firehose.exception.ConfigurationException; -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.proto.ProtoToFieldMapper; -import io.odpf.firehose.sink.redis.parsers.RedisParser; -import io.odpf.firehose.sink.redis.parsers.RedisParserFactory; -import io.odpf.firehose.sink.redis.ttl.RedisTtl; -import io.odpf.firehose.sink.redis.ttl.RedisTTLFactory; -import io.odpf.stencil.client.StencilClient; -import io.odpf.stencil.Parser; -import org.apache.commons.lang.StringUtils; -import redis.clients.jedis.HostAndPort; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisCluster; - -import java.util.HashSet; - -/** - * Redis client factory. - */ -public class RedisClientFactory { - - private static final String DELIMITER = ","; - private StatsDReporter statsDReporter; - private RedisSinkConfig redisSinkConfig; - private StencilClient stencilClient; - - /** - * Instantiates a new Redis client factory. - * - * @param statsDReporter the statsd reporter - * @param redisSinkConfig the redis sink config - * @param stencilClient the stencil client - */ - public RedisClientFactory(StatsDReporter statsDReporter, RedisSinkConfig redisSinkConfig, StencilClient stencilClient) { - this.statsDReporter = statsDReporter; - this.redisSinkConfig = redisSinkConfig; - this.stencilClient = stencilClient; - } - - public RedisClient getClient() { - Parser protoParser = stencilClient.getParser(redisSinkConfig.getInputSchemaProtoClass()); - ProtoToFieldMapper protoToFieldMapper = new ProtoToFieldMapper(protoParser, redisSinkConfig.getInputSchemaProtoToColumnMapping()); - RedisParser redisParser = RedisParserFactory.getParser(protoToFieldMapper, protoParser, redisSinkConfig, statsDReporter); - RedisSinkDeploymentType redisSinkDeploymentType = redisSinkConfig.getSinkRedisDeploymentType(); - RedisTtl redisTTL = RedisTTLFactory.getTTl(redisSinkConfig); - return RedisSinkDeploymentType.CLUSTER.equals(redisSinkDeploymentType) - ? getRedisClusterClient(redisParser, redisTTL) - : getRedisStandaloneClient(redisParser, redisTTL); - } - - private RedisStandaloneClient getRedisStandaloneClient(RedisParser redisParser, RedisTtl redisTTL) { - Jedis jedis = null; - try { - jedis = new Jedis(HostAndPort.parseString(StringUtils.trim(redisSinkConfig.getSinkRedisUrls()))); - } catch (IllegalArgumentException e) { - throw new ConfigurationException(String.format("Invalid url for redis standalone: %s", redisSinkConfig.getSinkRedisUrls())); - } - return new RedisStandaloneClient(new FirehoseInstrumentation(statsDReporter, RedisStandaloneClient.class), redisParser, redisTTL, jedis); - } - - private RedisClusterClient getRedisClusterClient(RedisParser redisParser, RedisTtl redisTTL) { - String[] redisUrls = redisSinkConfig.getSinkRedisUrls().split(DELIMITER); - HashSet nodes = new HashSet<>(); - try { - for (String redisUrl : redisUrls) { - nodes.add(HostAndPort.parseString(StringUtils.trim(redisUrl))); - } - } catch (IllegalArgumentException e) { - throw new ConfigurationException(String.format("Invalid url(s) for redis cluster: %s", redisSinkConfig.getSinkRedisUrls())); - } - JedisCluster jedisCluster = new JedisCluster(nodes); - return new RedisClusterClient(new FirehoseInstrumentation(statsDReporter, RedisClusterClient.class), redisParser, redisTTL, jedisCluster); - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/client/RedisClusterClient.java b/src/main/java/io/odpf/firehose/sink/redis/client/RedisClusterClient.java deleted file mode 100644 index f72beb535..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/client/RedisClusterClient.java +++ /dev/null @@ -1,55 +0,0 @@ -package io.odpf.firehose.sink.redis.client; - -import io.odpf.firehose.message.Message; -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; -import io.odpf.firehose.sink.redis.parsers.RedisParser; -import io.odpf.firehose.sink.redis.ttl.RedisTtl; -import redis.clients.jedis.JedisCluster; - -import java.util.ArrayList; -import java.util.List; - -/** - * Redis cluster client. - */ -public class RedisClusterClient implements RedisClient { - - private FirehoseInstrumentation firehoseInstrumentation; - private RedisParser redisParser; - private RedisTtl redisTTL; - private JedisCluster jedisCluster; - private List redisDataEntries; - - /** - * Instantiates a new Redis cluster client. - * - * @param firehoseInstrumentation the instrumentation - * @param redisParser the redis parser - * @param redisTTL the redis ttl - * @param jedisCluster the jedis cluster - */ - public RedisClusterClient(FirehoseInstrumentation firehoseInstrumentation, RedisParser redisParser, RedisTtl redisTTL, JedisCluster jedisCluster) { - this.firehoseInstrumentation = firehoseInstrumentation; - this.redisParser = redisParser; - this.redisTTL = redisTTL; - this.jedisCluster = jedisCluster; - } - - @Override - public void prepare(List messages) { - redisDataEntries = redisParser.parse(messages); - } - - @Override - public List execute() { - redisDataEntries.forEach(redisDataEntry -> redisDataEntry.pushMessage(jedisCluster, redisTTL)); - return new ArrayList<>(); - } - - @Override - public void close() { - firehoseInstrumentation.logInfo("Closing Jedis client"); - jedisCluster.close(); - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/client/RedisStandaloneClient.java b/src/main/java/io/odpf/firehose/sink/redis/client/RedisStandaloneClient.java deleted file mode 100644 index ba6498467..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/client/RedisStandaloneClient.java +++ /dev/null @@ -1,67 +0,0 @@ -package io.odpf.firehose.sink.redis.client; - -import io.odpf.firehose.message.Message; -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; -import io.odpf.firehose.sink.redis.exception.NoResponseException; -import io.odpf.firehose.sink.redis.parsers.RedisParser; -import io.odpf.firehose.sink.redis.ttl.RedisTtl; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.Pipeline; -import redis.clients.jedis.Response; - -import java.util.ArrayList; -import java.util.List; - -/** - * Redis standalone client. - */ -public class RedisStandaloneClient implements RedisClient { - - private FirehoseInstrumentation firehoseInstrumentation; - private RedisParser redisParser; - private RedisTtl redisTTL; - private Jedis jedis; - private Pipeline jedisPipelined; - - /** - * Instantiates a new Redis standalone client. - * - * @param firehoseInstrumentation the instrumentation - * @param redisParser the redis parser - * @param redisTTL the redis ttl - * @param jedis the jedis - */ - public RedisStandaloneClient(FirehoseInstrumentation firehoseInstrumentation, RedisParser redisParser, RedisTtl redisTTL, Jedis jedis) { - this.firehoseInstrumentation = firehoseInstrumentation; - this.redisParser = redisParser; - this.redisTTL = redisTTL; - this.jedis = jedis; - } - - @Override - public void prepare(List messages) { - List redisDataEntries = redisParser.parse(messages); - jedisPipelined = jedis.pipelined(); - - jedisPipelined.multi(); - redisDataEntries.forEach(redisDataEntry -> redisDataEntry.pushMessage(jedisPipelined, redisTTL)); - } - - @Override - public List execute() { - Response> responses = jedisPipelined.exec(); - firehoseInstrumentation.logDebug("jedis responses: {}", responses); - jedisPipelined.sync(); - if (responses.get() == null || responses.get().isEmpty()) { - throw new NoResponseException(); - } - return new ArrayList<>(); - } - - @Override - public void close() { - firehoseInstrumentation.logInfo("Closing Jedis client"); - jedis.close(); - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisDataEntry.java b/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisDataEntry.java deleted file mode 100644 index 45e06d2eb..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisDataEntry.java +++ /dev/null @@ -1,27 +0,0 @@ -package io.odpf.firehose.sink.redis.dataentry; - -import io.odpf.firehose.sink.redis.ttl.RedisTtl; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.Pipeline; - -/** - * The interface Redis data entry. - */ -public interface RedisDataEntry { - - /** - * Push messages to jedis pipeline. - * - * @param jedisPipelined the jedis pipelined - * @param redisTTL the redis ttl - */ - void pushMessage(Pipeline jedisPipelined, RedisTtl redisTTL); - - /** - * Push message to jedis cluster. - * - * @param jedisCluster the jedis cluster - * @param redisTTL the redis ttl - */ - void pushMessage(JedisCluster jedisCluster, RedisTtl redisTTL); -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisHashSetFieldEntry.java b/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisHashSetFieldEntry.java deleted file mode 100644 index fac75fc9e..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisHashSetFieldEntry.java +++ /dev/null @@ -1,35 +0,0 @@ -package io.odpf.firehose.sink.redis.dataentry; - -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.sink.redis.ttl.RedisTtl; -import lombok.AllArgsConstructor; -import lombok.Getter; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.Pipeline; - -/** - * Class for Redis Hash set entry. - */ -@AllArgsConstructor -@Getter -public class RedisHashSetFieldEntry implements RedisDataEntry { - - private String key; - private String field; - private String value; - private FirehoseInstrumentation firehoseInstrumentation; - - @Override - public void pushMessage(Pipeline jedisPipelined, RedisTtl redisTTL) { - getFirehoseInstrumentation().logDebug("key: {}, field: {}, value: {}", getKey(), getField(), getValue()); - jedisPipelined.hset(getKey(), getField(), getValue()); - redisTTL.setTtl(jedisPipelined, getKey()); - } - - @Override - public void pushMessage(JedisCluster jedisCluster, RedisTtl redisTTL) { - getFirehoseInstrumentation().logDebug("key: {}, field: {}, value: {}", getKey(), getField(), getValue()); - jedisCluster.hset(getKey(), getField(), getValue()); - redisTTL.setTtl(jedisCluster, getKey()); - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisKeyValueEntry.java b/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisKeyValueEntry.java deleted file mode 100644 index 18d23f059..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisKeyValueEntry.java +++ /dev/null @@ -1,48 +0,0 @@ -package io.odpf.firehose.sink.redis.dataentry; - -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.sink.redis.ttl.RedisTtl; -import lombok.AllArgsConstructor; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.Pipeline; - -@AllArgsConstructor -@Getter -@EqualsAndHashCode(exclude = "firehoseInstrumentation") -public class RedisKeyValueEntry implements RedisDataEntry { - - private String key; - private String value; - private FirehoseInstrumentation firehoseInstrumentation; - - @Override - public void pushMessage(Pipeline jedisPipelined, RedisTtl redisTTL) { - firehoseInstrumentation.logDebug("key: {}, value: {}", key, value); - jedisPipelined.set(key, value); - redisTTL.setTtl(jedisPipelined, key); - } - - @Override - public void pushMessage(JedisCluster jedisCluster, RedisTtl redisTTL) { - firehoseInstrumentation.logDebug("key: {}, value: {}", key, value); - jedisCluster.set(key, value); - redisTTL.setTtl(jedisCluster, key); - - } - - @Override - public String toString() { - return "RedisKeyValueEntry{" - + - "key='" - + key - + '\'' - + - ", value='" + value - + '\'' - + - '}'; - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisListEntry.java b/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisListEntry.java deleted file mode 100644 index c6c9ee163..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/dataentry/RedisListEntry.java +++ /dev/null @@ -1,33 +0,0 @@ -package io.odpf.firehose.sink.redis.dataentry; - -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.sink.redis.ttl.RedisTtl; -import lombok.AllArgsConstructor; -import lombok.Getter; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.Pipeline; - -/** - * Class for Redis Hash set entry. - */ -@AllArgsConstructor -@Getter -public class RedisListEntry implements RedisDataEntry { - private String key; - private String value; - private FirehoseInstrumentation firehoseInstrumentation; - - @Override - public void pushMessage(Pipeline jedisPipelined, RedisTtl redisTTL) { - getFirehoseInstrumentation().logDebug("key: {}, value: {}", getKey(), getValue()); - jedisPipelined.lpush(getKey(), getValue()); - redisTTL.setTtl(jedisPipelined, getKey()); - } - - @Override - public void pushMessage(JedisCluster jedisCluster, RedisTtl redisTTL) { - getFirehoseInstrumentation().logDebug("key: {}, value: {}", getKey(), getValue()); - jedisCluster.lpush(getKey(), getValue()); - redisTTL.setTtl(jedisCluster, getKey()); - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/exception/NoResponseException.java b/src/main/java/io/odpf/firehose/sink/redis/exception/NoResponseException.java deleted file mode 100644 index 0fcfd1672..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/exception/NoResponseException.java +++ /dev/null @@ -1,16 +0,0 @@ -package io.odpf.firehose.sink.redis.exception; - -/** - * NoResponseException - *

- * Exception to raise if there is no responds from redisClient. - */ -public class NoResponseException extends RuntimeException { - - /** - * Instantiates a new No response exception. - */ - public NoResponseException() { - super("Redis Pipeline error: no responds received"); - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisHashSetParser.java b/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisHashSetParser.java deleted file mode 100644 index 3478e922f..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisHashSetParser.java +++ /dev/null @@ -1,48 +0,0 @@ -package io.odpf.firehose.sink.redis.parsers; - -import io.odpf.depot.metrics.StatsDReporter; -import io.odpf.firehose.config.RedisSinkConfig; -import io.odpf.firehose.message.Message; -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.proto.ProtoToFieldMapper; -import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; -import io.odpf.firehose.sink.redis.dataentry.RedisHashSetFieldEntry; -import com.google.protobuf.DynamicMessage; -import io.odpf.stencil.Parser; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * Redis hash set parser. - */ -public class RedisHashSetParser extends RedisParser { - private ProtoToFieldMapper protoToFieldMapper; - private RedisSinkConfig redisSinkConfig; - private StatsDReporter statsDReporter; - - /** - * Instantiates a new Redis hash set parser. - * @param protoToFieldMapper the proto to field mapper - * @param protoParser the proto parser - * @param redisSinkConfig the redis sink config - * @param statsDReporter the statsd reporter - */ - public RedisHashSetParser(ProtoToFieldMapper protoToFieldMapper, Parser protoParser, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) { - super(protoParser, redisSinkConfig); - this.protoToFieldMapper = protoToFieldMapper; - this.redisSinkConfig = redisSinkConfig; - this.statsDReporter = statsDReporter; - } - - @Override - public List parse(Message message) { - DynamicMessage parsedMessage = parseEsbMessage(message); - String redisKey = parseTemplate(parsedMessage, redisSinkConfig.getSinkRedisKeyTemplate()); - List messageEntries = new ArrayList<>(); - Map protoToFieldMap = protoToFieldMapper.getFields(getPayload(message)); - protoToFieldMap.forEach((key, value) -> messageEntries.add(new RedisHashSetFieldEntry(redisKey, parseTemplate(parsedMessage, key), String.valueOf(value), new FirehoseInstrumentation(statsDReporter, RedisHashSetFieldEntry.class)))); - return messageEntries; - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisKeyValueParser.java b/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisKeyValueParser.java deleted file mode 100644 index f5447fe78..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisKeyValueParser.java +++ /dev/null @@ -1,37 +0,0 @@ -package io.odpf.firehose.sink.redis.parsers; - -import com.google.protobuf.DynamicMessage; -import io.odpf.depot.metrics.StatsDReporter; -import io.odpf.firehose.config.RedisSinkConfig; -import io.odpf.firehose.message.Message; -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; -import io.odpf.firehose.sink.redis.dataentry.RedisKeyValueEntry; -import io.odpf.stencil.Parser; - -import java.util.Collections; -import java.util.List; - -public class RedisKeyValueParser extends RedisParser { - private RedisSinkConfig redisSinkConfig; - private StatsDReporter statsDReporter; - - public RedisKeyValueParser(Parser protoParser, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) { - super(protoParser, redisSinkConfig); - this.redisSinkConfig = redisSinkConfig; - this.statsDReporter = statsDReporter; - } - - @Override - public List parse(Message message) { - DynamicMessage parsedMessage = parseEsbMessage(message); - String redisKey = parseTemplate(parsedMessage, redisSinkConfig.getSinkRedisKeyTemplate()); - String protoIndex = redisSinkConfig.getSinkRedisKeyValuetDataProtoIndex(); - if (protoIndex == null) { - throw new IllegalArgumentException("Please provide SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX in key value sink"); - } - FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, RedisKeyValueEntry.class); - RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(redisKey, getDataByFieldNumber(parsedMessage, protoIndex).toString(), firehoseInstrumentation); - return Collections.singletonList(redisKeyValueEntry); - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisListParser.java b/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisListParser.java deleted file mode 100644 index 760c26384..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisListParser.java +++ /dev/null @@ -1,48 +0,0 @@ -package io.odpf.firehose.sink.redis.parsers; - - -import io.odpf.depot.metrics.StatsDReporter; -import io.odpf.firehose.config.RedisSinkConfig; -import io.odpf.firehose.message.Message; -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; -import io.odpf.firehose.sink.redis.dataentry.RedisListEntry; -import com.google.protobuf.DynamicMessage; -import io.odpf.stencil.Parser; - -import java.util.ArrayList; -import java.util.List; - -/** - * Redis list parser. - */ -public class RedisListParser extends RedisParser { - private RedisSinkConfig redisSinkConfig; - private StatsDReporter statsDReporter; - - /** - * Instantiates a new Redis list parser. - * - * @param protoParser the proto parser - * @param redisSinkConfig the redis sink config - * @param statsDReporter the stats d reporter - */ - public RedisListParser(Parser protoParser, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) { - super(protoParser, redisSinkConfig); - this.redisSinkConfig = redisSinkConfig; - this.statsDReporter = statsDReporter; - } - - @Override - public List parse(Message message) { - DynamicMessage parsedMessage = parseEsbMessage(message); - String redisKey = parseTemplate(parsedMessage, redisSinkConfig.getSinkRedisKeyTemplate()); - String protoIndex = redisSinkConfig.getSinkRedisListDataProtoIndex(); - if (protoIndex == null) { - throw new IllegalArgumentException("Please provide SINK_REDIS_LIST_DATA_PROTO_INDEX in list sink"); - } - List messageEntries = new ArrayList<>(); - messageEntries.add(new RedisListEntry(redisKey, getDataByFieldNumber(parsedMessage, protoIndex).toString(), new FirehoseInstrumentation(statsDReporter, RedisListEntry.class))); - return messageEntries; - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisParser.java b/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisParser.java deleted file mode 100644 index b027c46b2..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisParser.java +++ /dev/null @@ -1,128 +0,0 @@ -package io.odpf.firehose.sink.redis.parsers; - - -import io.odpf.firehose.config.RedisSinkConfig; -import io.odpf.firehose.message.Message; -import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; -import com.google.protobuf.Descriptors; -import com.google.protobuf.DynamicMessage; -import com.google.protobuf.InvalidProtocolBufferException; -import io.odpf.stencil.Parser; -import lombok.AllArgsConstructor; -import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.common.errors.InvalidConfigurationException; - -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - -/** - * Convert kafka messages to RedisDataEntry. - */ -@AllArgsConstructor -public abstract class RedisParser { - - private Parser protoParser; - private RedisSinkConfig redisSinkConfig; - - public abstract List parse(Message message); - - public List parse(List messages) { - return messages - .stream() - .map(this::parse) - .flatMap(Collection::stream) - .collect(Collectors.toList()); - } - - /** - * Parse esb message to protobuf. - * - * @param message parsed message - * @return Parsed Proto object - */ - DynamicMessage parseEsbMessage(Message message) { - DynamicMessage parsedMessage; - try { - parsedMessage = protoParser.parse(getPayload(message)); - } catch (InvalidProtocolBufferException e) { - throw new IllegalArgumentException("Unable to parse data when reading Key", e); - } - return parsedMessage; - } - - /** - * Parse template string. - * - * @param data the data - * @param template the template - * @return parsed template - */ - String parseTemplate(DynamicMessage data, String template) { - if (StringUtils.isEmpty(template)) { - throw new IllegalArgumentException("Template '" + template + "' is invalid"); - } - String[] templateStrings = template.split(","); - if (templateStrings.length == 0) { - throw new InvalidConfigurationException("Empty key configuration: '" + template + "'"); - } - templateStrings = Arrays - .stream(templateStrings) - .map(String::trim) - .toArray(String[]::new); - String templatePattern = templateStrings[0]; - String templateVariables = StringUtils.join(Arrays.copyOfRange(templateStrings, 1, templateStrings.length), ","); - String renderedTemplate = renderStringTemplate(data, templatePattern, templateVariables); - return StringUtils.isEmpty(templateVariables) - ? templatePattern - : renderedTemplate; - } - - private String renderStringTemplate(DynamicMessage parsedMessage, String pattern, String patternVariables) { - if (StringUtils.isEmpty(patternVariables)) { - return pattern; - } - List patternVariableFieldNumbers = Arrays.asList(patternVariables.split(",")); - Object[] patternVariableData = patternVariableFieldNumbers - .stream() - .map(fieldNumber -> getDataByFieldNumber(parsedMessage, fieldNumber)) - .toArray(); - return String.format(pattern, patternVariableData); - } - - /** - * Gets data by field number. - * - * @param parsedMessage the parsed message - * @param fieldNumber the field number - * @return Data object - */ - Object getDataByFieldNumber(DynamicMessage parsedMessage, String fieldNumber) { - int fieldNumberInt; - try { - fieldNumberInt = Integer.parseInt(fieldNumber); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid Proto Index"); - } - Descriptors.FieldDescriptor fieldDescriptor = parsedMessage.getDescriptorForType().findFieldByNumber(fieldNumberInt); - if (fieldDescriptor == null) { - throw new IllegalArgumentException(String.format("Descriptor not found for index: %s", fieldNumber)); - } - return parsedMessage.getField(fieldDescriptor); - } - - /** - * Get payload bytes. - * - * @param message the message - * @return binary payload - */ - byte[] getPayload(Message message) { - if (redisSinkConfig.getKafkaRecordParserMode().equals("key")) { - return message.getLogKey(); - } else { - return message.getLogMessage(); - } - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisParserFactory.java b/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisParserFactory.java deleted file mode 100644 index edfb62e79..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/parsers/RedisParserFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -package io.odpf.firehose.sink.redis.parsers; - -import io.odpf.depot.metrics.StatsDReporter; -import io.odpf.firehose.config.RedisSinkConfig; -import io.odpf.firehose.proto.ProtoToFieldMapper; -import io.odpf.stencil.Parser; - -/** - * Redis parser factory. - */ -public class RedisParserFactory { - - /** - * Gets parser. - * - * @param protoToFieldMapper the proto to field mapper - * @param protoParser the proto parser - * @param redisSinkConfig the redis sink config - * @param statsDReporter the statsd reporter - * @return RedisParser - */ - public static RedisParser getParser(ProtoToFieldMapper protoToFieldMapper, Parser protoParser, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) { - - switch (redisSinkConfig.getSinkRedisDataType()) { - case LIST: - return new RedisListParser(protoParser, redisSinkConfig, statsDReporter); - case HASHSET: - return new RedisHashSetParser(protoToFieldMapper, protoParser, redisSinkConfig, statsDReporter); - case KEYVALUE: - return new RedisKeyValueParser(protoParser, redisSinkConfig, statsDReporter); - default: - return null; - } - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/ttl/DurationTtl.java b/src/main/java/io/odpf/firehose/sink/redis/ttl/DurationTtl.java deleted file mode 100644 index 45d3a7e06..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/ttl/DurationTtl.java +++ /dev/null @@ -1,23 +0,0 @@ -package io.odpf.firehose.sink.redis.ttl; - -import lombok.AllArgsConstructor; -import lombok.Getter; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.Pipeline; - - -@AllArgsConstructor -@Getter -public class DurationTtl implements RedisTtl { - private int ttlInSeconds; - - @Override - public void setTtl(Pipeline jedisPipelined, String key) { - jedisPipelined.expire(key, ttlInSeconds); - } - - @Override - public void setTtl(JedisCluster jedisCluster, String key) { - jedisCluster.expire(key, ttlInSeconds); - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/ttl/ExactTimeTtl.java b/src/main/java/io/odpf/firehose/sink/redis/ttl/ExactTimeTtl.java deleted file mode 100644 index 86e91ed84..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/ttl/ExactTimeTtl.java +++ /dev/null @@ -1,23 +0,0 @@ -package io.odpf.firehose.sink.redis.ttl; - -import lombok.AllArgsConstructor; -import lombok.Getter; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.Pipeline; - - -@AllArgsConstructor -@Getter -public class ExactTimeTtl implements RedisTtl { - private long unixTime; - - @Override - public void setTtl(Pipeline jedisPipelined, String key) { - jedisPipelined.expireAt(key, unixTime); - } - - @Override - public void setTtl(JedisCluster jedisCluster, String key) { - jedisCluster.expireAt(key, unixTime); - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/ttl/NoRedisTtl.java b/src/main/java/io/odpf/firehose/sink/redis/ttl/NoRedisTtl.java deleted file mode 100644 index ef36800f4..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/ttl/NoRedisTtl.java +++ /dev/null @@ -1,15 +0,0 @@ -package io.odpf.firehose.sink.redis.ttl; - -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.Pipeline; - -public class NoRedisTtl implements RedisTtl { - @Override - public void setTtl(Pipeline jedisPipelined, String key) { - } - - @Override - public void setTtl(JedisCluster jedisCluster, String key) { - - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/ttl/RedisTTLFactory.java b/src/main/java/io/odpf/firehose/sink/redis/ttl/RedisTTLFactory.java deleted file mode 100644 index 2ca0c70fc..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/ttl/RedisTTLFactory.java +++ /dev/null @@ -1,22 +0,0 @@ -package io.odpf.firehose.sink.redis.ttl; - -import io.odpf.firehose.config.RedisSinkConfig; -import io.odpf.firehose.exception.ConfigurationException; - -public class RedisTTLFactory { - - public static RedisTtl getTTl(RedisSinkConfig redisSinkConfig) { - long redisTTLValue = redisSinkConfig.getSinkRedisTtlValue(); - if (redisTTLValue < 0) { - throw new ConfigurationException("Provide a positive TTL value"); - } - switch (redisSinkConfig.getSinkRedisTtlType()) { - case EXACT_TIME: - return new ExactTimeTtl(redisTTLValue); - case DURATION: - return new DurationTtl((int) redisTTLValue); - default: - return new NoRedisTtl(); - } - } -} diff --git a/src/main/java/io/odpf/firehose/sink/redis/ttl/RedisTtl.java b/src/main/java/io/odpf/firehose/sink/redis/ttl/RedisTtl.java deleted file mode 100644 index e994e6e57..000000000 --- a/src/main/java/io/odpf/firehose/sink/redis/ttl/RedisTtl.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.odpf.firehose.sink.redis.ttl; - -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.Pipeline; - -/** - * Interface for RedisTTL. - */ -public interface RedisTtl { - void setTtl(Pipeline jedisPipelined, String key); - - void setTtl(JedisCluster jedisCluster, String key); -} diff --git a/src/test/java/io/odpf/firehose/config/RedisSinkDataTypeConverterTest.java b/src/test/java/io/odpf/firehose/config/RedisSinkDataTypeConverterTest.java deleted file mode 100644 index 03b026109..000000000 --- a/src/test/java/io/odpf/firehose/config/RedisSinkDataTypeConverterTest.java +++ /dev/null @@ -1,47 +0,0 @@ -package io.odpf.firehose.config; - -import io.odpf.firehose.config.converter.RedisSinkDataTypeConverter; -import io.odpf.firehose.config.enums.RedisSinkDataType; -import org.gradle.internal.impldep.org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class RedisSinkDataTypeConverterTest { - - private RedisSinkDataTypeConverter redisSinkDataTypeConverter; - - @Before - public void setUp() { - redisSinkDataTypeConverter = new RedisSinkDataTypeConverter(); - } - - @Test - public void shouldReturnListSinkTypeFromLowerCaseInput() { - RedisSinkDataType redisSinkDataType = redisSinkDataTypeConverter.convert(null, "list"); - Assert.assertTrue(redisSinkDataType.equals(RedisSinkDataType.LIST)); - } - - @Test - public void shouldReturnListSinkTypeFromUpperCaseInput() { - RedisSinkDataType redisSinkDataType = redisSinkDataTypeConverter.convert(null, "LIST"); - Assert.assertTrue(redisSinkDataType.equals(RedisSinkDataType.LIST)); - } - - @Test - public void shouldReturnListSinkTypeFromMixedCaseInput() { - RedisSinkDataType redisSinkDataType = redisSinkDataTypeConverter.convert(null, "LiSt"); - Assert.assertTrue(redisSinkDataType.equals(RedisSinkDataType.LIST)); - } - - @Test - public void shouldReturnHashSetSinkTypeFromInput() { - RedisSinkDataType redisSinkDataType = redisSinkDataTypeConverter.convert(null, "hashset"); - Assert.assertTrue(redisSinkDataType.equals(RedisSinkDataType.HASHSET)); - } - - @Test(expected = IllegalArgumentException.class) - public void shouldThrowOnIllegalArgument() { - redisSinkDataTypeConverter.convert(null, ""); - } - -} diff --git a/src/test/java/io/odpf/firehose/sink/redis/RedisSinkFactoryTest.java b/src/test/java/io/odpf/firehose/sink/redis/RedisSinkFactoryTest.java deleted file mode 100644 index a42bc7a14..000000000 --- a/src/test/java/io/odpf/firehose/sink/redis/RedisSinkFactoryTest.java +++ /dev/null @@ -1,41 +0,0 @@ -package io.odpf.firehose.sink.redis; - - -import io.odpf.depot.metrics.StatsDReporter; -import io.odpf.firehose.sink.AbstractSink; -import io.odpf.stencil.client.StencilClient; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -public class RedisSinkFactoryTest { - private Map configuration; - - @Mock - private StatsDReporter statsDReporter; - - @Mock - private StencilClient stencilClient; - - @Before - public void setUp() { - configuration = new HashMap<>(); - MockitoAnnotations.initMocks(this); - } - - @Test - public void shouldCreateRedisSink() { - configuration.put("SINK_REDIS_URLS", "localhost:6379"); - configuration.put("SINK_REDIS_KEY_TEMPLATE", "test_%%s,6"); - configuration.put("SINK_REDIS_LIST_DATA_PROTO_INDEX", "3"); - - AbstractSink sink = RedisSinkFactory.create(configuration, statsDReporter, stencilClient); - assertEquals(RedisSink.class, sink.getClass()); - } -} diff --git a/src/test/java/io/odpf/firehose/sink/redis/RedisSinkTest.java b/src/test/java/io/odpf/firehose/sink/redis/RedisSinkTest.java deleted file mode 100644 index 6b22704a7..000000000 --- a/src/test/java/io/odpf/firehose/sink/redis/RedisSinkTest.java +++ /dev/null @@ -1,98 +0,0 @@ -package io.odpf.firehose.sink.redis; - -import io.odpf.firehose.message.Message; -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.sink.redis.client.RedisClient; -import io.odpf.firehose.sink.redis.exception.NoResponseException; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.InOrder; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import java.time.Instant; -import java.util.ArrayList; -import static org.mockito.Mockito.*; - -@RunWith(MockitoJUnitRunner.class) -public class RedisSinkTest { - @Mock - private RedisClient redisClient; - @Mock - private FirehoseInstrumentation firehoseInstrumentation; - private RedisSink redis; - - @Before - public void setup() { - when(firehoseInstrumentation.startExecution()).thenReturn(Instant.now()); - redis = new RedisSink(firehoseInstrumentation, "redis", redisClient); - } - - @Test - public void shouldInvokeExecuteOnTheClient() { - redis.execute(); - - verify(redisClient).execute(); - } - - @Test - public void shouldInvokePrepareOnTheClient() { - ArrayList messages = new ArrayList<>(); - - redis.prepare(messages); - - verify(redisClient).prepare(messages); - } - - @Test - public void shouldInvokeCloseOnTheClient() { - redis.close(); - - verify(redisClient).close(); - } - - @Test - public void shouldLogWhenClosingConnection() { - redis.close(); - - verify(firehoseInstrumentation, times(1)).logInfo("Redis connection closing"); - } - - @Test - public void sendsMetricsForSuccessMessages() { - ArrayList messages = new ArrayList<>(); - - redis.pushMessage(messages); - - verify(firehoseInstrumentation, times(1)).capturePreExecutionLatencies(messages); - verify(firehoseInstrumentation, times(1)).startExecution(); - verify(firehoseInstrumentation, times(1)).logInfo("Preparing {} messages", messages.size()); - verify(firehoseInstrumentation, times(1)).captureSinkExecutionTelemetry(any(), any()); - InOrder inOrder = inOrder(firehoseInstrumentation); - inOrder.verify(firehoseInstrumentation).logInfo("Preparing {} messages", messages.size()); - inOrder.verify(firehoseInstrumentation).capturePreExecutionLatencies(messages); - inOrder.verify(firehoseInstrumentation).startExecution(); - inOrder.verify(firehoseInstrumentation).captureSinkExecutionTelemetry(any(), any()); - } - - @Test - public void sendsMetricsForFailedMessages() { - when(redisClient.execute()).thenThrow(new NoResponseException()); - ArrayList messages = new ArrayList<>(); - - redis.pushMessage(messages); - - verify(firehoseInstrumentation, times(1)).capturePreExecutionLatencies(messages); - verify(firehoseInstrumentation, times(1)).startExecution(); - verify(firehoseInstrumentation, times(1)).logInfo("Preparing {} messages", messages.size()); - verify(firehoseInstrumentation, times(1)).captureSinkExecutionTelemetry(any(), any()); - InOrder inOrder = inOrder(firehoseInstrumentation); - inOrder.verify(firehoseInstrumentation).logInfo("Preparing {} messages", messages.size()); - inOrder.verify(firehoseInstrumentation).capturePreExecutionLatencies(messages); - inOrder.verify(firehoseInstrumentation).startExecution(); - inOrder.verify(firehoseInstrumentation).captureSinkExecutionTelemetry(any(), any()); - } - - -} diff --git a/src/test/java/io/odpf/firehose/sink/redis/client/RedisClientFactoryTest.java b/src/test/java/io/odpf/firehose/sink/redis/client/RedisClientFactoryTest.java deleted file mode 100644 index 8ba8c2ad2..000000000 --- a/src/test/java/io/odpf/firehose/sink/redis/client/RedisClientFactoryTest.java +++ /dev/null @@ -1,118 +0,0 @@ -package io.odpf.firehose.sink.redis.client; - - -import io.odpf.depot.metrics.StatsDReporter; -import io.odpf.firehose.config.RedisSinkConfig; -import io.odpf.firehose.config.enums.RedisSinkDeploymentType; -import io.odpf.firehose.config.enums.RedisSinkDataType; -import io.odpf.firehose.config.enums.RedisSinkTtlType; -import io.odpf.firehose.exception.ConfigurationException; -import io.odpf.stencil.client.StencilClient; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import static org.mockito.Mockito.when; - -@RunWith(MockitoJUnitRunner.class) -public class RedisClientFactoryTest { - - @Rule - public ExpectedException expectedException = ExpectedException.none(); - - @Mock - private RedisSinkConfig redisSinkConfig; - - @Mock - private StencilClient stencilClient; - - @Mock - private StatsDReporter statsDReporter; - - @Test - public void shouldGetStandaloneClient() { - when(redisSinkConfig.getSinkRedisDataType()).thenReturn(RedisSinkDataType.LIST); - when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); - when(redisSinkConfig.getSinkRedisDeploymentType()).thenReturn(RedisSinkDeploymentType.STANDALONE); - when(redisSinkConfig.getSinkRedisUrls()).thenReturn("0.0.0.0:0"); - - RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient); - - RedisClient client = redisClientFactory.getClient(); - - Assert.assertEquals(RedisStandaloneClient.class, client.getClass()); - } - - @Test - public void shouldGetStandaloneClientWhenURLHasSpaces() { - when(redisSinkConfig.getSinkRedisDataType()).thenReturn(RedisSinkDataType.LIST); - when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); - when(redisSinkConfig.getSinkRedisDeploymentType()).thenReturn(RedisSinkDeploymentType.STANDALONE); - when(redisSinkConfig.getSinkRedisUrls()).thenReturn(" 0.0.0.0:0 "); - RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient); - - RedisClient client = redisClientFactory.getClient(); - - Assert.assertEquals(RedisStandaloneClient.class, client.getClass()); - } - - @Test - public void shouldGetClusterClient() { - when(redisSinkConfig.getSinkRedisDataType()).thenReturn(RedisSinkDataType.LIST); - when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); - when(redisSinkConfig.getSinkRedisDeploymentType()).thenReturn(RedisSinkDeploymentType.CLUSTER); - when(redisSinkConfig.getSinkRedisUrls()).thenReturn("0.0.0.0:0, 1.1.1.1:1"); - RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient); - - RedisClient client = redisClientFactory.getClient(); - - Assert.assertEquals(RedisClusterClient.class, client.getClass()); - } - - @Test - public void shouldGetClusterClientWhenURLHasSpaces() { - when(redisSinkConfig.getSinkRedisDataType()).thenReturn(RedisSinkDataType.LIST); - when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); - when(redisSinkConfig.getSinkRedisDeploymentType()).thenReturn(RedisSinkDeploymentType.CLUSTER); - when(redisSinkConfig.getSinkRedisUrls()).thenReturn(" 0.0.0.0:0, 1.1.1.1:1 "); - RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient); - - RedisClient client = redisClientFactory.getClient(); - - Assert.assertEquals(RedisClusterClient.class, client.getClass()); - } - - @Test - public void shouldThrowExceptionWhenUrlIsInvalidForCluster() { - expectedException.expect(ConfigurationException.class); - expectedException.expectMessage("Invalid url(s) for redis cluster: localhost:6379,localhost:6378,localhost"); - - when(redisSinkConfig.getSinkRedisDataType()).thenReturn(RedisSinkDataType.LIST); - when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); - when(redisSinkConfig.getSinkRedisDeploymentType()).thenReturn(RedisSinkDeploymentType.CLUSTER); - when(redisSinkConfig.getSinkRedisUrls()).thenReturn("localhost:6379,localhost:6378,localhost"); - - RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient); - - redisClientFactory.getClient(); - } - - @Test - public void shouldThrowExceptionWhenUrlIsInvalidForStandalone() { - expectedException.expect(ConfigurationException.class); - expectedException.expectMessage("Invalid url for redis standalone: localhost"); - - when(redisSinkConfig.getSinkRedisDataType()).thenReturn(RedisSinkDataType.LIST); - when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); - when(redisSinkConfig.getSinkRedisDeploymentType()).thenReturn(RedisSinkDeploymentType.STANDALONE); - when(redisSinkConfig.getSinkRedisUrls()).thenReturn("localhost"); - - RedisClientFactory redisClientFactory = new RedisClientFactory(statsDReporter, redisSinkConfig, stencilClient); - - redisClientFactory.getClient(); - } -} diff --git a/src/test/java/io/odpf/firehose/sink/redis/client/RedisClusterClientTest.java b/src/test/java/io/odpf/firehose/sink/redis/client/RedisClusterClientTest.java deleted file mode 100644 index 75ea4e909..000000000 --- a/src/test/java/io/odpf/firehose/sink/redis/client/RedisClusterClientTest.java +++ /dev/null @@ -1,114 +0,0 @@ -package io.odpf.firehose.sink.redis.client; - -import io.odpf.depot.metrics.StatsDReporter; -import io.odpf.firehose.message.Message; -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; -import io.odpf.firehose.sink.redis.dataentry.RedisHashSetFieldEntry; -import io.odpf.firehose.sink.redis.dataentry.RedisListEntry; -import io.odpf.firehose.sink.redis.parsers.RedisParser; -import io.odpf.firehose.sink.redis.ttl.RedisTtl; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.mockito.junit.MockitoJUnitRunner; -import redis.clients.jedis.JedisCluster; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import static org.mockito.Mockito.*; - -@RunWith(MockitoJUnitRunner.class) -public class RedisClusterClientTest { - @Mock - private StatsDReporter statsDReporter; - - @Mock - private FirehoseInstrumentation firehoseInstrumentation; - - private final RedisHashSetFieldEntry firstRedisSetEntry = new RedisHashSetFieldEntry("key1", "field1", "value1", new FirehoseInstrumentation(statsDReporter, RedisHashSetFieldEntry.class)); - private final RedisHashSetFieldEntry secondRedisSetEntry = new RedisHashSetFieldEntry("key2", "field2", "value2", new FirehoseInstrumentation(statsDReporter, RedisHashSetFieldEntry.class)); - private final RedisListEntry firstRedisListEntry = new RedisListEntry("key1", "value1", new FirehoseInstrumentation(statsDReporter, RedisListEntry.class)); - private final RedisListEntry secondRedisListEntry = new RedisListEntry("key2", "value2", new FirehoseInstrumentation(statsDReporter, RedisListEntry.class)); - @Mock - private JedisCluster jedisCluster; - - @Mock - private RedisParser redisParser; - - @Mock - private RedisTtl redisTTL; - private List messages; - private RedisClusterClient redisClusterClient; - private ArrayList redisDataEntries; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - messages = Arrays.asList(new Message(new byte[0], new byte[0], "topic", 0, 100), - new Message(new byte[0], new byte[0], "topic", 0, 100)); - - redisClusterClient = new RedisClusterClient(firehoseInstrumentation, redisParser, redisTTL, jedisCluster); - - redisDataEntries = new ArrayList<>(); - - when(redisParser.parse(messages)).thenReturn(redisDataEntries); - } - - @Test - public void shouldParseEsbMessagesWhenPreparing() { - redisClusterClient.prepare(messages); - - verify(redisParser).parse(messages); - } - - @Test - public void shouldSendAllListDataWhenExecuting() { - populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); - - redisClusterClient.prepare(messages); - redisClusterClient.execute(); - - verify(jedisCluster).lpush(firstRedisListEntry.getKey(), firstRedisListEntry.getValue()); - verify(jedisCluster).lpush(secondRedisListEntry.getKey(), secondRedisListEntry.getValue()); - } - - @Test - public void shouldSendAllSetDataWhenExecuting() { - populateRedisDataEntry(firstRedisSetEntry, secondRedisSetEntry); - - redisClusterClient.prepare(messages); - redisClusterClient.execute(); - - verify(jedisCluster).hset(firstRedisSetEntry.getKey(), firstRedisSetEntry.getField(), firstRedisListEntry.getValue()); - verify(jedisCluster).hset(secondRedisSetEntry.getKey(), secondRedisSetEntry.getField(), secondRedisListEntry.getValue()); - } - - @Test - public void shouldReturnEmptyArrayAfterExecuting() { - populateRedisDataEntry(firstRedisSetEntry, secondRedisSetEntry); - - redisClusterClient.prepare(messages); - List retryElements = redisClusterClient.execute(); - - Assert.assertEquals(0, retryElements.size()); - } - - @Test - public void shouldCloseTheJedisClient() { - redisClusterClient.close(); - - verify(firehoseInstrumentation, times(1)).logInfo("Closing Jedis client"); - verify(jedisCluster).close(); - } - - - private void populateRedisDataEntry(RedisDataEntry... redisData) { - redisDataEntries.addAll(Arrays.asList(redisData)); - } -} diff --git a/src/test/java/io/odpf/firehose/sink/redis/client/RedisStandaloneClientTest.java b/src/test/java/io/odpf/firehose/sink/redis/client/RedisStandaloneClientTest.java deleted file mode 100644 index 8bdc67c78..000000000 --- a/src/test/java/io/odpf/firehose/sink/redis/client/RedisStandaloneClientTest.java +++ /dev/null @@ -1,194 +0,0 @@ -package io.odpf.firehose.sink.redis.client; - -import io.odpf.depot.metrics.StatsDReporter; -import io.odpf.firehose.message.Message; -import io.odpf.firehose.exception.DeserializerException; -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; -import io.odpf.firehose.sink.redis.dataentry.RedisHashSetFieldEntry; -import io.odpf.firehose.sink.redis.dataentry.RedisListEntry; -import io.odpf.firehose.sink.redis.exception.NoResponseException; -import io.odpf.firehose.sink.redis.parsers.RedisParser; -import io.odpf.firehose.sink.redis.ttl.RedisTtl; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.mockito.junit.MockitoJUnitRunner; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.Pipeline; -import redis.clients.jedis.Response; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static org.mockito.Mockito.*; - -@RunWith(MockitoJUnitRunner.class) -public class RedisStandaloneClientTest { - @Mock - private StatsDReporter statsDReporter; - @Mock - private FirehoseInstrumentation firehoseInstrumentation; - - private final RedisHashSetFieldEntry firstRedisSetEntry = new RedisHashSetFieldEntry("key1", "field1", "value1", new FirehoseInstrumentation(statsDReporter, RedisHashSetFieldEntry.class)); - private final RedisHashSetFieldEntry secondRedisSetEntry = new RedisHashSetFieldEntry("key2", "field2", "value2", new FirehoseInstrumentation(statsDReporter, RedisHashSetFieldEntry.class)); - private final RedisListEntry firstRedisListEntry = new RedisListEntry("key1", "value1", new FirehoseInstrumentation(statsDReporter, RedisListEntry.class)); - private final RedisListEntry secondRedisListEntry = new RedisListEntry("key2", "value2", new FirehoseInstrumentation(statsDReporter, RedisListEntry.class)); - @Rule - public ExpectedException expectedException = ExpectedException.none(); - private RedisClient redisClient; - private List messages; - private List redisDataEntries; - @Mock - private RedisParser redisMessageParser; - - @Mock - private RedisTtl redisTTL; - - @Mock - private Jedis jedis; - - @Mock - private Pipeline jedisPipeline; - - @Mock - private Response> responses; - - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - messages = Arrays.asList(new Message(new byte[0], new byte[0], "topic", 0, 100), - new Message(new byte[0], new byte[0], "topic", 0, 100)); - - redisClient = new RedisStandaloneClient(firehoseInstrumentation, redisMessageParser, redisTTL, jedis); - - redisDataEntries = new ArrayList<>(); - - when(jedis.pipelined()).thenReturn(jedisPipeline); - when(redisMessageParser.parse(messages)).thenReturn(redisDataEntries); - } - - @Test - public void pushesDataEntryForListInATransaction() throws DeserializerException { - populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); - - redisClient.prepare(messages); - - verify(jedisPipeline, times(1)).multi(); - verify(jedisPipeline).lpush(firstRedisListEntry.getKey(), firstRedisListEntry.getValue()); - verify(jedisPipeline).lpush(secondRedisListEntry.getKey(), secondRedisListEntry.getValue()); - } - - @Test - public void setsTTLForListItemsInATransaction() throws DeserializerException { - populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); - - redisClient.prepare(messages); - - verify(redisTTL).setTtl(jedisPipeline, firstRedisListEntry.getKey()); - verify(redisTTL).setTtl(jedisPipeline, secondRedisListEntry.getKey()); - } - - @Test - public void pushesDataEntryForSetInATransaction() throws DeserializerException { - populateRedisDataEntry(firstRedisSetEntry, secondRedisSetEntry); - - redisClient.prepare(messages); - - verify(jedisPipeline, times(1)).multi(); - verify(jedisPipeline).hset(firstRedisSetEntry.getKey(), firstRedisSetEntry.getField(), firstRedisSetEntry.getValue()); - verify(jedisPipeline).hset(secondRedisSetEntry.getKey(), secondRedisSetEntry.getField(), secondRedisSetEntry.getValue()); - } - - @Test - public void setsTTLForSetItemsInATransaction() throws DeserializerException { - populateRedisDataEntry(firstRedisSetEntry, secondRedisSetEntry); - - redisClient.prepare(messages); - - verify(redisTTL).setTtl(jedisPipeline, firstRedisSetEntry.getKey()); - verify(redisTTL).setTtl(jedisPipeline, secondRedisSetEntry.getKey()); - } - - @Test - public void shouldCompleteTransactionInExec() { - populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); - when(jedisPipeline.exec()).thenReturn(responses); - when(responses.get()).thenReturn(Collections.singletonList("MOCK_LIST_ITEM")); - - redisClient.prepare(messages); - redisClient.execute(); - - verify(jedisPipeline).exec(); - verify(firehoseInstrumentation, times(1)).logDebug("jedis responses: {}", responses); - } - - @Test - public void shouldWaitForResponseInExec() { - populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); - when(jedisPipeline.exec()).thenReturn(responses); - when(responses.get()).thenReturn(Collections.singletonList("MOCK_LIST_ITEM")); - - redisClient.prepare(messages); - redisClient.execute(); - - verify(jedisPipeline).sync(); - } - - @Test - public void shouldThrowExceptionWhenResponseIsNullInExec() { - expectedException.expect(NoResponseException.class); - - populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); - when(jedisPipeline.exec()).thenReturn(responses); - when(responses.get()).thenReturn(null); - - redisClient.prepare(messages); - redisClient.execute(); - } - - @Test - public void shouldThrowExceptionWhenResponseIsEmptyInExec() { - expectedException.expect(NoResponseException.class); - - populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); - when(jedisPipeline.exec()).thenReturn(responses); - when(responses.get()).thenReturn(new ArrayList<>()); - - redisClient.prepare(messages); - redisClient.execute(); - } - - @Test - public void shouldReturnEmptyArrayInExec() { - populateRedisDataEntry(firstRedisListEntry, secondRedisListEntry); - when(jedisPipeline.exec()).thenReturn(responses); - when(responses.get()).thenReturn(Collections.singletonList("MOCK_LIST_ITEM")); - - redisClient.prepare(messages); - List elementsToRetry = redisClient.execute(); - - Assert.assertEquals(0, elementsToRetry.size()); - } - - @Test - public void shouldCloseTheClient() { - redisClient.close(); - - verify(firehoseInstrumentation, times(1)).logInfo("Closing Jedis client"); - verify(jedis, times(1)).close(); - } - - - private void populateRedisDataEntry(RedisDataEntry... redisData) { - redisDataEntries.addAll(Arrays.asList(redisData)); - } -} diff --git a/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisHashSetFieldEntryTest.java b/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisHashSetFieldEntryTest.java deleted file mode 100644 index c2fb58ad0..000000000 --- a/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisHashSetFieldEntryTest.java +++ /dev/null @@ -1,105 +0,0 @@ -package io.odpf.firehose.sink.redis.dataentry; - -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.sink.redis.ttl.DurationTtl; -import io.odpf.firehose.sink.redis.ttl.ExactTimeTtl; -import io.odpf.firehose.sink.redis.ttl.NoRedisTtl; -import io.odpf.firehose.sink.redis.ttl.RedisTtl; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.InOrder; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import org.mockito.junit.MockitoJUnitRunner; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.Pipeline; - -import static org.mockito.Mockito.*; - -@RunWith(MockitoJUnitRunner.class) -public class RedisHashSetFieldEntryTest { - @Mock - private FirehoseInstrumentation firehoseInstrumentation; - - @Mock - private Pipeline pipeline; - - @Mock - private JedisCluster jedisCluster; - - private RedisTtl redisTTL; - private RedisHashSetFieldEntry redisHashSetFieldEntry; - private InOrder inOrderPipeline; - private InOrder inOrderJedis; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - redisTTL = new NoRedisTtl(); - redisHashSetFieldEntry = new RedisHashSetFieldEntry("test-key", "test-field", "test-value", firehoseInstrumentation); - inOrderPipeline = Mockito.inOrder(pipeline); - inOrderJedis = Mockito.inOrder(jedisCluster); - } - - @Test - public void shouldIOnlyPushDataWithoutTTLByDefaultForPipeline() { - redisHashSetFieldEntry.pushMessage(pipeline, redisTTL); - - verify(pipeline, times(1)).hset("test-key", "test-field", "test-value"); - verify(pipeline, times(0)).expireAt(any(String.class), any(Long.class)); - verify(pipeline, times(0)).expireAt(any(String.class), any(Long.class)); - verify(firehoseInstrumentation, times(1)).logDebug("key: {}, field: {}, value: {}", "test-key", "test-field", "test-value"); - } - - @Test - public void shouldSetProperTTLForExactTimeForPipeline() { - redisTTL = new ExactTimeTtl(1000L); - redisHashSetFieldEntry.pushMessage(pipeline, redisTTL); - - inOrderPipeline.verify(pipeline, times(1)).hset("test-key", "test-field", "test-value"); - inOrderPipeline.verify(pipeline, times(1)).expireAt("test-key", 1000L); - verify(firehoseInstrumentation, times(1)).logDebug("key: {}, field: {}, value: {}", "test-key", "test-field", "test-value"); - } - - @Test - public void shouldSetProperTTLForDurationForPipeline() { - redisTTL = new DurationTtl(1000); - redisHashSetFieldEntry.pushMessage(pipeline, redisTTL); - - inOrderPipeline.verify(pipeline, times(1)).hset("test-key", "test-field", "test-value"); - inOrderPipeline.verify(pipeline, times(1)).expire("test-key", 1000); - verify(firehoseInstrumentation, times(1)).logDebug("key: {}, field: {}, value: {}", "test-key", "test-field", "test-value"); - } - - @Test - public void shouldIOnlyPushDataWithoutTTLByDefaultForCluster() { - redisHashSetFieldEntry.pushMessage(jedisCluster, redisTTL); - - verify(jedisCluster, times(1)).hset("test-key", "test-field", "test-value"); - verify(jedisCluster, times(0)).expireAt(any(String.class), any(Long.class)); - verify(jedisCluster, times(0)).expireAt(any(String.class), any(Long.class)); - verify(firehoseInstrumentation, times(1)).logDebug("key: {}, field: {}, value: {}", "test-key", "test-field", "test-value"); - } - - @Test - public void shouldSetProperTTLForExactTimeForCluster() { - redisTTL = new ExactTimeTtl(1000L); - redisHashSetFieldEntry.pushMessage(jedisCluster, redisTTL); - - inOrderJedis.verify(jedisCluster, times(1)).hset("test-key", "test-field", "test-value"); - inOrderJedis.verify(jedisCluster, times(1)).expireAt("test-key", 1000L); - verify(firehoseInstrumentation, times(1)).logDebug("key: {}, field: {}, value: {}", "test-key", "test-field", "test-value"); - } - - @Test - public void shouldSetProperTTLForDuration() { - redisTTL = new DurationTtl(1000); - redisHashSetFieldEntry.pushMessage(jedisCluster, redisTTL); - - inOrderJedis.verify(jedisCluster, times(1)).hset("test-key", "test-field", "test-value"); - inOrderJedis.verify(jedisCluster, times(1)).expire("test-key", 1000); - verify(firehoseInstrumentation, times(1)).logDebug("key: {}, field: {}, value: {}", "test-key", "test-field", "test-value"); - } -} diff --git a/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisKeyValueEntryTest.java b/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisKeyValueEntryTest.java deleted file mode 100644 index 31f0f2a46..000000000 --- a/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisKeyValueEntryTest.java +++ /dev/null @@ -1,100 +0,0 @@ -package io.odpf.firehose.sink.redis.dataentry; - -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.sink.redis.ttl.DurationTtl; -import io.odpf.firehose.sink.redis.ttl.NoRedisTtl; -import org.junit.Before; -import org.junit.Test; -import org.mockito.InOrder; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.Pipeline; - -import static org.mockito.Mockito.*; - -public class RedisKeyValueEntryTest { - @Mock - private FirehoseInstrumentation firehoseInstrumentation; - - @Mock - private Pipeline pipeline; - - @Mock - private JedisCluster jedisCluster; - - private InOrder inOrderPipeline; - private InOrder inOrderJedis; - - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - inOrderPipeline = Mockito.inOrder(pipeline); - inOrderJedis = Mockito.inOrder(jedisCluster); - - } - - @Test - public void pushMessageWithNoTtl() { - String key = "key"; - String value = "value"; - RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, firehoseInstrumentation); - redisKeyValueEntry.pushMessage(pipeline, new NoRedisTtl()); - inOrderPipeline.verify(pipeline, times(1)).set(key, value); - inOrderPipeline.verify(pipeline, times(0)).expireAt(any(String.class), any(Long.class)); - - } - - @Test - public void pushMessageWithTtl() { - String key = "key"; - String value = "value"; - RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, firehoseInstrumentation); - redisKeyValueEntry.pushMessage(pipeline, new DurationTtl(100)); - inOrderPipeline.verify(pipeline, times(1)).set(key, value); - inOrderPipeline.verify(pipeline, times(1)).expire(key, 100); - } - - @Test - public void pushMessageVerifyInstrumentation() { - String key = "this-key"; - String value = "john"; - RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, firehoseInstrumentation); - redisKeyValueEntry.pushMessage(pipeline, new DurationTtl(100)); - verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", key, value); - } - - - @Test - public void pushMessageWithNoTtlUsingJedisCluster() { - String key = "key"; - String value = "value"; - RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, firehoseInstrumentation); - redisKeyValueEntry.pushMessage(jedisCluster, new NoRedisTtl()); - inOrderJedis.verify(jedisCluster, times(1)).set(key, value); - inOrderJedis.verify(jedisCluster, times(0)).expireAt(any(String.class), any(Long.class)); - - } - - @Test - public void pushMessageWithTtlUsingJedisCluster() { - String key = "key"; - String value = "value"; - RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, firehoseInstrumentation); - redisKeyValueEntry.pushMessage(jedisCluster, new DurationTtl(100)); - inOrderJedis.verify(jedisCluster, times(1)).set(key, value); - inOrderJedis.verify(jedisCluster, times(1)).expire(key, 100); - } - - @Test - public void pushMessageVerifyInstrumentationUsingJedisCluster() { - String key = "this-key"; - String value = "john"; - RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(key, value, firehoseInstrumentation); - redisKeyValueEntry.pushMessage(jedisCluster, new DurationTtl(100)); - verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", key, value); - } - -} diff --git a/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisListEntryTest.java b/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisListEntryTest.java deleted file mode 100644 index d4583f633..000000000 --- a/src/test/java/io/odpf/firehose/sink/redis/dataentry/RedisListEntryTest.java +++ /dev/null @@ -1,94 +0,0 @@ -package io.odpf.firehose.sink.redis.dataentry; - -import io.odpf.firehose.metrics.FirehoseInstrumentation; -import io.odpf.firehose.sink.redis.ttl.DurationTtl; -import io.odpf.firehose.sink.redis.ttl.ExactTimeTtl; -import io.odpf.firehose.sink.redis.ttl.NoRedisTtl; -import io.odpf.firehose.sink.redis.ttl.RedisTtl; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.Pipeline; - -import static org.mockito.Mockito.*; - -@RunWith(MockitoJUnitRunner.class) -public class RedisListEntryTest { - - @Mock - private FirehoseInstrumentation firehoseInstrumentation; - - @Mock - private Pipeline pipeline; - - @Mock - private JedisCluster jedisCluster; - - private RedisTtl redisTTL; - private RedisListEntry redisListEntry; - - @Before - public void setup() { - redisTTL = new NoRedisTtl(); - redisListEntry = new RedisListEntry("test-key", "test-value", firehoseInstrumentation); - } - - @Test - public void shouldIOnlyPushDataWithoutTTLByDefaultForPipeline() { - redisListEntry.pushMessage(pipeline, redisTTL); - - verify(pipeline, times(1)).lpush("test-key", "test-value"); - verify(pipeline, times(0)).expireAt(any(String.class), any(Long.class)); - verify(pipeline, times(0)).expireAt(any(String.class), any(Long.class)); - verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", "test-key", "test-value"); - } - - @Test - public void shouldSetProperTTLForExactTimeForPipeline() { - redisTTL = new ExactTimeTtl(1000L); - redisListEntry.pushMessage(pipeline, redisTTL); - - verify(pipeline, times(1)).expireAt("test-key", 1000L); - verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", "test-key", "test-value"); - } - - @Test - public void shouldSetProperTTLForDurationForPipeline() { - redisTTL = new DurationTtl(1000); - redisListEntry.pushMessage(pipeline, redisTTL); - - verify(pipeline, times(1)).expire("test-key", 1000); - verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", "test-key", "test-value"); - } - - @Test - public void shouldIOnlyPushDataWithoutTTLByDefaultForCluster() { - redisListEntry.pushMessage(jedisCluster, redisTTL); - - verify(jedisCluster, times(1)).lpush("test-key", "test-value"); - verify(jedisCluster, times(0)).expireAt(any(String.class), any(Long.class)); - verify(jedisCluster, times(0)).expireAt(any(String.class), any(Long.class)); - verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", "test-key", "test-value"); - } - - @Test - public void shouldSetProperTTLForExactTimeForCluster() { - redisTTL = new ExactTimeTtl(1000L); - redisListEntry.pushMessage(jedisCluster, redisTTL); - - verify(jedisCluster, times(1)).expireAt("test-key", 1000L); - verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", "test-key", "test-value"); - } - - @Test - public void shouldSetProperTTLForDurationForCluster() { - redisTTL = new DurationTtl(1000); - redisListEntry.pushMessage(jedisCluster, redisTTL); - - verify(jedisCluster, times(1)).expire("test-key", 1000); - verify(firehoseInstrumentation, times(1)).logDebug("key: {}, value: {}", "test-key", "test-value"); - } -} diff --git a/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisHashSetParserTest.java b/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisHashSetParserTest.java deleted file mode 100644 index 407425813..000000000 --- a/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisHashSetParserTest.java +++ /dev/null @@ -1,336 +0,0 @@ -package io.odpf.firehose.sink.redis.parsers; - - - -import io.odpf.depot.metrics.StatsDReporter; -import io.odpf.firehose.config.RedisSinkConfig; -import io.odpf.firehose.config.enums.RedisSinkDataType; -import io.odpf.firehose.message.Message; -import io.odpf.firehose.consumer.TestKey; -import io.odpf.firehose.consumer.TestMessage; -import io.odpf.firehose.consumer.TestBookingLogMessage; -import io.odpf.firehose.consumer.TestNestedRepeatedMessage; -import io.odpf.firehose.proto.ProtoToFieldMapper; -import io.odpf.firehose.sink.redis.dataentry.RedisHashSetFieldEntry; -import io.odpf.stencil.client.ClassLoadStencilClient; -import io.odpf.stencil.Parser; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import java.util.IllegalFormatConversionException; -import java.util.Properties; -import java.util.UnknownFormatConversionException; - -import static junit.framework.TestCase.assertEquals; -import static org.mockito.Mockito.when; - -@RunWith(MockitoJUnitRunner.class) -public class RedisHashSetParserTest { - - private final long bookingCustomerTotalFare = 2000L; - private final float bookingAmountPaidByCash = 12.3F; - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Mock - private RedisSinkConfig redisSinkConfig; - - @Mock - private StatsDReporter statsDReporter; - - private Message message; - private Parser testKeyProtoParser; - private Parser testMessageProtoParser; - private ClassLoadStencilClient stencilClient; - private Message bookingMessage; - private Parser bookingMessageProtoParser; - private String bookingOrderNumber = "booking-order-1"; - - @Before - public void setUp() throws Exception { - - TestKey testKey = TestKey.newBuilder().setOrderNumber("ORDER-1-FROM-KEY").build(); - TestBookingLogMessage testBookingLogMessage = TestBookingLogMessage.newBuilder().setOrderNumber(bookingOrderNumber).setCustomerTotalFareWithoutSurge(bookingCustomerTotalFare).setAmountPaidByCash(bookingAmountPaidByCash).build(); - TestMessage testMessage = TestMessage.newBuilder().setOrderNumber("test-order").setOrderDetails("ORDER-DETAILS").build(); - this.message = new Message(testKey.toByteArray(), testMessage.toByteArray(), "test", 1, 11); - this.bookingMessage = new Message(testKey.toByteArray(), testBookingLogMessage.toByteArray(), "test", 1, 11); - stencilClient = new ClassLoadStencilClient(); - testMessageProtoParser = stencilClient.getParser(TestMessage.class.getCanonicalName()); - bookingMessageProtoParser = stencilClient.getParser(TestBookingLogMessage.class.getCanonicalName()); - testKeyProtoParser = stencilClient.getParser(TestKey.class.getCanonicalName()); - } - - private void setRedisSinkConfig(String parserMode, String collectionKeyTemplate, RedisSinkDataType redisSinkDataType) { - when(redisSinkConfig.getKafkaRecordParserMode()).thenReturn(parserMode); - when(redisSinkConfig.getSinkRedisKeyTemplate()).thenReturn(collectionKeyTemplate); - } - - @Test - public void shouldParseStringMessageForCollectionKeyTemplate() { - setRedisSinkConfig("message", "Test-%s,1", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForTestMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("3", "details")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForTestMessage, testMessageProtoParser, redisSinkConfig, statsDReporter); - - RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(message).get(0); - - assertEquals("ORDER-DETAILS", redisHashSetFieldEntry.getValue()); - assertEquals("details", redisHashSetFieldEntry.getField()); - assertEquals("Test-test-order", redisHashSetFieldEntry.getKey()); - } - - @Test - public void shouldParseStringMessageWithSpacesForCollectionKeyTemplate() { - setRedisSinkConfig("message", "Test-%s, 1", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForTestMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("3", "details")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForTestMessage, testMessageProtoParser, redisSinkConfig, statsDReporter); - RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(message).get(0); - - assertEquals("ORDER-DETAILS", redisHashSetFieldEntry.getValue()); - assertEquals("details", redisHashSetFieldEntry.getField()); - assertEquals("Test-test-order", redisHashSetFieldEntry.getKey()); - } - - - @Test - public void shouldParseFloatMessageForCollectionKeyTemplate() { - setRedisSinkConfig("message", "Test-%.2f,16", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); - - assertEquals("Test-12.30", redisHashSetFieldEntry.getKey()); - assertEquals("order_number_1", redisHashSetFieldEntry.getField()); - assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); - } - - @Test - public void shouldParseLongMessageForCollectionKeyTemplate() { - setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); - - assertEquals("Test-2000", redisHashSetFieldEntry.getKey()); - assertEquals("order_number_1", redisHashSetFieldEntry.getField()); - assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); - } - - @Test - public void shouldThrowExceptionForInvalidPatternInCollectionKeyTemplate() { - expectedException.expect(UnknownFormatConversionException.class); - expectedException.expectMessage("Conversion = '%'"); - - setRedisSinkConfig("message", "Test-%,52", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - redisMessageParser.parse(bookingMessage); - } - - @Test - public void shouldThrowExceptionForIncompatiblePatternInCollectionKeyTemplate() { - expectedException.expect(IllegalFormatConversionException.class); - expectedException.expectMessage("f != java.lang.Long"); - - setRedisSinkConfig("message", "Test-%f,52", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - redisMessageParser.parse(bookingMessage); - } - - @Test - public void shouldThrowExceptionForNonExistingDescriptorInCollectionKeyTemplate() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Descriptor not found for index: 20000"); - - setRedisSinkConfig("message", "Test-%f,20000", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - redisMessageParser.parse(bookingMessage); - } - - @Test - public void shouldThrowExceptionForNullCollectionKeyTemplate() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Template 'null' is invalid"); - - setRedisSinkConfig("message", null, RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - redisMessageParser.parse(bookingMessage); - } - - @Test - public void shouldThrowExceptionForEmptyCollectionKeyTemplate() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Template '' is invalid"); - - setRedisSinkConfig("message", "", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - redisMessageParser.parse(bookingMessage); - } - - @Test - public void shouldAcceptStringForCollectionKey() { - setRedisSinkConfig("message", "Test", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); - - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); - assertEquals("Test", redisHashSetFieldEntry.getKey()); - assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); - } - - @Test - public void shouldAcceptStringWithPatternForCollectionKeyWithEmptyVariables() { - setRedisSinkConfig("message", "Test-%s", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_1")); - - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); - assertEquals("Test-%s", redisHashSetFieldEntry.getKey()); - assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); - } - - @Test - public void shouldParseLongMessageForKey() { - setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_%d,52")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); - - assertEquals("Test-2000", redisHashSetFieldEntry.getKey()); - assertEquals(String.format("order_number_%s", bookingCustomerTotalFare), redisHashSetFieldEntry.getField()); - assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); - } - - @Test - public void shouldParseLongMessageWithSpaceForKey() { - setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_%d, 52")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); - - assertEquals("Test-2000", redisHashSetFieldEntry.getKey()); - assertEquals(String.format("order_number_%s", bookingCustomerTotalFare), redisHashSetFieldEntry.getField()); - assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); - } - - @Test - public void shouldParseStringMessageForKey() { - setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number_%s,2")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); - - assertEquals("Test-2000", redisHashSetFieldEntry.getKey()); - assertEquals(String.format("order_number_%s", bookingOrderNumber), redisHashSetFieldEntry.getField()); - assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); - } - - @Test - public void shouldHandleStaticStringForKey() { - setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); - assertEquals("Test-2000", redisHashSetFieldEntry.getKey()); - assertEquals("order_number", redisHashSetFieldEntry.getField()); - assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); - } - - @Test - public void shouldHandleStaticStringWithPatternForKey() { - setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number%s")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); - assertEquals("Test-2000", redisHashSetFieldEntry.getKey()); - assertEquals("order_number%s", redisHashSetFieldEntry.getField()); - assertEquals("booking-order-1", redisHashSetFieldEntry.getValue()); - } - - @Test - public void shouldThrowErrorForInvalidFormatForKey() { - expectedException.expect(UnknownFormatConversionException.class); - expectedException.expectMessage("Conversion = '%"); - - setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number-%,52")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - redisMessageParser.parse(bookingMessage); - } - - @Test - public void shouldThrowErrorForIncompatibleFormatForKey() { - expectedException.expect(IllegalFormatConversionException.class); - expectedException.expectMessage("d != java.lang.String"); - - setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "order_number-%d,2")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(message).get(0); - - assertEquals("ORDER-DETAILS", redisHashSetFieldEntry.getValue()); - assertEquals("details", redisHashSetFieldEntry.getField()); - assertEquals("Test-test-order", redisHashSetFieldEntry.getKey()); - } - - @Test - public void shouldThrowExceptionForEmptyKey() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Template '' is invalid"); - - setRedisSinkConfig("message", "Test-%d,52", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForBookingMessage = new ProtoToFieldMapper(testMessageProtoParser, getProperties("2", "")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForBookingMessage, bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - redisMessageParser.parse(bookingMessage); - } - - @Test - public void shouldParseKeyWhenKafkaMessageParseModeSetToKey() { - setRedisSinkConfig("key", "Test-%s,1", RedisSinkDataType.HASHSET); - ProtoToFieldMapper protoToFieldMapperForKey = new ProtoToFieldMapper(testKeyProtoParser, getProperties("1", "order")); - - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForKey, testKeyProtoParser, redisSinkConfig, statsDReporter); - RedisHashSetFieldEntry redisHashSetFieldEntry = (RedisHashSetFieldEntry) redisMessageParser.parse(bookingMessage).get(0); - - assertEquals(redisHashSetFieldEntry.getValue(), "ORDER-1-FROM-KEY"); - } - - @Test(expected = IllegalArgumentException.class) - public void shouldThrowInvalidProtocolBufferExceptionWhenIncorrectProtocolUsed() { - setRedisSinkConfig("message", "Test-%s,1", RedisSinkDataType.HASHSET); - Parser protoParserForTest = stencilClient.getParser(TestNestedRepeatedMessage.class.getCanonicalName()); - ProtoToFieldMapper protoToFieldMapperForTest = new ProtoToFieldMapper(protoParserForTest, getProperties("3", "details")); - RedisParser redisMessageParser = new RedisHashSetParser(protoToFieldMapperForTest, protoParserForTest, redisSinkConfig, statsDReporter); - - redisMessageParser.parse(message); - } - - private Properties getProperties(String s, String order) { - Properties propertiesForKey = new Properties(); - propertiesForKey.setProperty(s, order); - return propertiesForKey; - } -} diff --git a/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisKeyValueParserTest.java b/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisKeyValueParserTest.java deleted file mode 100644 index 3a414b2d2..000000000 --- a/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisKeyValueParserTest.java +++ /dev/null @@ -1,97 +0,0 @@ -package io.odpf.firehose.sink.redis.parsers; - -import io.odpf.depot.metrics.StatsDReporter; -import io.odpf.firehose.config.RedisSinkConfig; -import io.odpf.firehose.consumer.TestKey; -import io.odpf.firehose.consumer.TestMessage; -import io.odpf.firehose.message.Message; -import io.odpf.firehose.sink.redis.dataentry.RedisDataEntry; -import io.odpf.firehose.sink.redis.dataentry.RedisKeyValueEntry; -import io.odpf.stencil.Parser; -import io.odpf.stencil.client.ClassLoadStencilClient; -import io.odpf.stencil.client.StencilClient; -import org.junit.Test; -import org.junit.jupiter.api.Assertions; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static java.util.Arrays.asList; -import static java.util.Collections.singletonMap; -import static org.aeonbits.owner.ConfigFactory.create; -import static org.junit.Assert.assertEquals; - -public class RedisKeyValueParserTest { - - private final byte[] testKeyByteArr = TestKey.newBuilder() - .setOrderNumber("order-2") - .setOrderUrl("order-url-world") - .build() - .toByteArray(); - private StatsDReporter statsDReporter; - private StencilClient stencilClient = new ClassLoadStencilClient(); - private Parser testMessageProtoParser = stencilClient.getParser(TestMessage.class.getCanonicalName()); - private Parser testKeyProtoParser = stencilClient.getParser(TestKey.class.getCanonicalName()); - - @Test - public void parse() { - Map config = new HashMap() {{ - put("KAFKA_RECORD_PARSER_MODE", "message"); - put("SINK_REDIS_KEY_TEMPLATE", "hello_world_%%s,1"); - put("SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX", "3"); - }}; - RedisSinkConfig redisSinkConfig = create(RedisSinkConfig.class, config); - RedisKeyValueParser redisKeyValueParser = new RedisKeyValueParser(testMessageProtoParser, redisSinkConfig, statsDReporter); - byte[] logMessage = TestMessage.newBuilder() - .setOrderNumber("xyz-order") - .setOrderDetails("new-eureka-order") - .build() - .toByteArray(); - Message message = new Message(null, logMessage, "test-topic", 1, 100); - List redisDataEntries = redisKeyValueParser.parse(message); - - RedisKeyValueEntry expectedEntry = new RedisKeyValueEntry("hello_world_xyz-order", "new-eureka-order", null); - assertEquals(asList(expectedEntry), redisDataEntries); - - } - - @Test - public void shouldParseWhenUsingModeKey() { - Map config = new HashMap() {{ - put("KAFKA_RECORD_PARSER_MODE", "key"); - put("SINK_REDIS_KEY_TEMPLATE", "hello_world_%%s,1"); - put("SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX", "2"); - }}; - RedisSinkConfig redisSinkConfig = create(RedisSinkConfig.class, config); - - RedisKeyValueParser redisKeyValueParser = new RedisKeyValueParser(testKeyProtoParser, redisSinkConfig, null); - Message message = new Message(testKeyByteArr, null, null, 0, 0L); - redisKeyValueParser.parse(message); - } - - @Test - public void shouldThrowExceptionWhenKeyTemplateIsEmpty() { - - Message message = new Message(testKeyByteArr, testKeyByteArr, "", 0, 0); - RedisSinkConfig redisSinkConfig = create(RedisSinkConfig.class, singletonMap("SINK_REDIS_KEY_TEMPLATE", "")); - RedisKeyValueParser redisKeyValueParser = new RedisKeyValueParser(testKeyProtoParser, redisSinkConfig, null); - IllegalArgumentException illegalArgumentException = - Assertions.assertThrows(IllegalArgumentException.class, () -> redisKeyValueParser.parse(message)); - assertEquals("Template '' is invalid", illegalArgumentException.getMessage()); - } - - @Test - public void shouldThrowExceptionForNoListProtoIndex() { - HashMap config = new HashMap() {{ - put("SINK_REDIS_KEY_TEMPLATE", "hello_world%%s,1"); - }}; - RedisSinkConfig redisSinkConfig = create(RedisSinkConfig.class, config); - - Message message = new Message(testKeyByteArr, testKeyByteArr, "", 0, 0); - RedisKeyValueParser redisKeyValueParser = new RedisKeyValueParser(testKeyProtoParser, redisSinkConfig, null); - IllegalArgumentException illegalArgumentException = Assertions.assertThrows(IllegalArgumentException.class, - () -> redisKeyValueParser.parse(message)); - assertEquals("Please provide SINK_REDIS_KEY_VALUE_DATA_PROTO_INDEX in key value sink", illegalArgumentException.getMessage()); - } -} diff --git a/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisListParserTest.java b/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisListParserTest.java deleted file mode 100644 index c2856efa2..000000000 --- a/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisListParserTest.java +++ /dev/null @@ -1,107 +0,0 @@ -package io.odpf.firehose.sink.redis.parsers; - -import io.odpf.depot.metrics.StatsDReporter; -import io.odpf.firehose.config.RedisSinkConfig; -import io.odpf.firehose.config.enums.RedisSinkDataType; -import io.odpf.firehose.message.Message; -import io.odpf.firehose.consumer.TestBookingLogMessage; -import io.odpf.firehose.consumer.TestKey; -import io.odpf.firehose.consumer.TestMessage; -import io.odpf.firehose.sink.redis.dataentry.RedisListEntry; -import io.odpf.stencil.client.ClassLoadStencilClient; -import io.odpf.stencil.Parser; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import static junit.framework.TestCase.assertEquals; -import static org.mockito.Mockito.when; - -@RunWith(MockitoJUnitRunner.class) -public class RedisListParserTest { - private final long bookingCustomerTotalFare = 2000L; - private final float bookingAmountPaidByCash = 12.3F; - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Mock - private RedisSinkConfig redisSinkConfig; - - @Mock - private StatsDReporter statsDReporter; - - private Message message; - private Parser testKeyProtoParser; - private Parser testMessageProtoParser; - private ClassLoadStencilClient stencilClient; - private Message bookingMessage; - private Parser bookingMessageProtoParser; - private String bookingOrderNumber = "booking-order-1"; - - @Before - public void setUp() throws Exception { - - TestKey testKey = TestKey.newBuilder().setOrderNumber("ORDER-1-FROM-KEY").build(); - TestBookingLogMessage testBookingLogMessage = TestBookingLogMessage.newBuilder().setOrderNumber(bookingOrderNumber).setCustomerTotalFareWithoutSurge(bookingCustomerTotalFare).setAmountPaidByCash(bookingAmountPaidByCash).build(); - TestMessage testMessage = TestMessage.newBuilder().setOrderNumber("test-order").setOrderDetails("ORDER-DETAILS").build(); - this.message = new Message(testKey.toByteArray(), testMessage.toByteArray(), "test", 1, 11); - this.bookingMessage = new Message(testKey.toByteArray(), testBookingLogMessage.toByteArray(), "test", 1, 11); - stencilClient = new ClassLoadStencilClient(); - testMessageProtoParser = stencilClient.getParser(TestMessage.class.getCanonicalName()); - bookingMessageProtoParser = stencilClient.getParser(TestBookingLogMessage.class.getCanonicalName()); - testKeyProtoParser = stencilClient.getParser(TestKey.class.getCanonicalName()); - } - - private void setRedisSinkConfig(String parserMode, String collectionKeyTemplate, RedisSinkDataType redisSinkDataType) { - when(redisSinkConfig.getKafkaRecordParserMode()).thenReturn(parserMode); - when(redisSinkConfig.getSinkRedisKeyTemplate()).thenReturn(collectionKeyTemplate); - when(redisSinkConfig.getSinkRedisListDataProtoIndex()).thenReturn("1"); - } - - @Test - public void shouldParseStringMessageForCollectionKeyTemplateInList() { - setRedisSinkConfig("message", "Test-%s,1", RedisSinkDataType.LIST); - RedisParser redisParser = new RedisListParser(testMessageProtoParser, redisSinkConfig, statsDReporter); - - RedisListEntry redisListEntry = (RedisListEntry) redisParser.parse(message).get(0); - - assertEquals("test-order", redisListEntry.getValue()); - assertEquals("Test-test-order", redisListEntry.getKey()); - } - - @Test - public void shouldParseKeyWhenKafkaMessageParseModeSetToKey() { - setRedisSinkConfig("key", "Test-%s,1", RedisSinkDataType.LIST); - - RedisParser redisParser = new RedisListParser(testKeyProtoParser, redisSinkConfig, statsDReporter); - RedisListEntry redisListEntry = (RedisListEntry) redisParser.parse(bookingMessage).get(0); - - assertEquals(redisListEntry.getValue(), "ORDER-1-FROM-KEY"); - } - - @Test - public void shouldThrowExceptionForEmptyKey() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Template '' is invalid"); - - setRedisSinkConfig("message", "", RedisSinkDataType.LIST); - RedisParser redisParser = new RedisListParser(bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - redisParser.parse(bookingMessage); - } - - @Test - public void shouldThrowExceptionForNoListProtoIndex() { - setRedisSinkConfig("message", "Test-%s,1", RedisSinkDataType.LIST); - when(redisSinkConfig.getSinkRedisListDataProtoIndex()).thenReturn(null); - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("Please provide SINK_REDIS_LIST_DATA_PROTO_INDEX in list sink"); - - RedisParser redisParser = new RedisListParser(bookingMessageProtoParser, redisSinkConfig, statsDReporter); - - redisParser.parse(bookingMessage); - } -} diff --git a/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisParserFactoryTest.java b/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisParserFactoryTest.java deleted file mode 100644 index 0fc00930b..000000000 --- a/src/test/java/io/odpf/firehose/sink/redis/parsers/RedisParserFactoryTest.java +++ /dev/null @@ -1,84 +0,0 @@ -package io.odpf.firehose.sink.redis.parsers; - - -import io.odpf.depot.metrics.StatsDReporter; -import io.odpf.firehose.config.RedisSinkConfig; -import io.odpf.firehose.config.enums.RedisSinkDataType; -import io.odpf.firehose.consumer.TestMessage; -import io.odpf.firehose.proto.ProtoToFieldMapper; -import io.odpf.stencil.Parser; -import io.odpf.stencil.client.ClassLoadStencilClient; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import java.util.Properties; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.when; - - -@RunWith(MockitoJUnitRunner.class) -public class RedisParserFactoryTest { - - @Rule - public ExpectedException expectedException = ExpectedException.none(); - @Mock - private RedisSinkConfig redisSinkConfig; - - @Mock - private StatsDReporter statsDReporter; - - private ClassLoadStencilClient stencilClient; - private ProtoToFieldMapper protoToFieldMapper; - private Parser testMessageProtoParser; - - - @Before - public void setUp() throws Exception { - stencilClient = new ClassLoadStencilClient(); - testMessageProtoParser = stencilClient.getParser(TestMessage.class.getCanonicalName()); - protoToFieldMapper = new ProtoToFieldMapper(testMessageProtoParser, getProperties("3", "details")); - } - - private void setRedisSinkConfig(RedisSinkDataType redisSinkDataType) { - when(redisSinkConfig.getSinkRedisDataType()).thenReturn(redisSinkDataType); - } - - @Test - public void shouldReturnNewRedisListParser() { - setRedisSinkConfig(RedisSinkDataType.LIST); - - RedisParser parser = RedisParserFactory.getParser(protoToFieldMapper, testMessageProtoParser, redisSinkConfig, statsDReporter); - - assertEquals(RedisListParser.class, parser.getClass()); - } - - @Test - public void shouldReturnNewRedisHashSetParser() { - setRedisSinkConfig(RedisSinkDataType.HASHSET); - - RedisParser parser = RedisParserFactory.getParser(protoToFieldMapper, testMessageProtoParser, redisSinkConfig, statsDReporter); - - assertEquals(RedisHashSetParser.class, parser.getClass()); - } - - @Test - public void shouldReturnNewRedisKeyValueParser() { - setRedisSinkConfig(RedisSinkDataType.KEYVALUE); - - RedisParser parser = RedisParserFactory.getParser(protoToFieldMapper, testMessageProtoParser, redisSinkConfig, statsDReporter); - - assertEquals(RedisKeyValueParser.class, parser.getClass()); - } - - private Properties getProperties(String s, String order) { - Properties propertiesForKey = new Properties(); - propertiesForKey.setProperty(s, order); - return propertiesForKey; - } -} diff --git a/src/test/java/io/odpf/firehose/sink/redis/ttl/DurationTTLTest.java b/src/test/java/io/odpf/firehose/sink/redis/ttl/DurationTTLTest.java deleted file mode 100644 index 2ff62c958..000000000 --- a/src/test/java/io/odpf/firehose/sink/redis/ttl/DurationTTLTest.java +++ /dev/null @@ -1,41 +0,0 @@ -package io.odpf.firehose.sink.redis.ttl; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.Pipeline; - -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -@RunWith(MockitoJUnitRunner.class) -public class DurationTTLTest { - - private DurationTtl durationTTL; - - @Mock - private Pipeline pipeline; - - @Mock - private JedisCluster jedisCluster; - - @Before - public void setup() { - durationTTL = new DurationTtl(10); - } - - @Test - public void shouldSetTTLInSecondsForPipeline() { - durationTTL.setTtl(pipeline, "test-key"); - verify(pipeline, times(1)).expire("test-key", 10); - } - - @Test - public void shouldSetTTLInSecondsForCluster() { - durationTTL.setTtl(jedisCluster, "test-key"); - verify(jedisCluster, times(1)).expire("test-key", 10); - } -} diff --git a/src/test/java/io/odpf/firehose/sink/redis/ttl/ExactTimeTTLTest.java b/src/test/java/io/odpf/firehose/sink/redis/ttl/ExactTimeTTLTest.java deleted file mode 100644 index ba2475cd8..000000000 --- a/src/test/java/io/odpf/firehose/sink/redis/ttl/ExactTimeTTLTest.java +++ /dev/null @@ -1,39 +0,0 @@ -package io.odpf.firehose.sink.redis.ttl; - -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import redis.clients.jedis.JedisCluster; -import redis.clients.jedis.Pipeline; - -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.MockitoAnnotations.initMocks; - -public class ExactTimeTTLTest { - - private ExactTimeTtl exactTimeTTL; - @Mock - private Pipeline pipeline; - - @Mock - private JedisCluster jedisCluster; - - @Before - public void setup() { - initMocks(this); - exactTimeTTL = new ExactTimeTtl(10000000L); - } - - @Test - public void shouldSetUnixTimeStampAsTTLForPipeline() { - exactTimeTTL.setTtl(pipeline, "test-key"); - verify(pipeline, times(1)).expireAt("test-key", 10000000L); - } - - @Test - public void shouldSetUnixTimeStampAsTTLForCluster() { - exactTimeTTL.setTtl(jedisCluster, "test-key"); - verify(jedisCluster, times(1)).expireAt("test-key", 10000000L); - } -} diff --git a/src/test/java/io/odpf/firehose/sink/redis/ttl/RedisTtlFactoryTest.java b/src/test/java/io/odpf/firehose/sink/redis/ttl/RedisTtlFactoryTest.java deleted file mode 100644 index 2b316cc9f..000000000 --- a/src/test/java/io/odpf/firehose/sink/redis/ttl/RedisTtlFactoryTest.java +++ /dev/null @@ -1,61 +0,0 @@ -package io.odpf.firehose.sink.redis.ttl; - -import io.odpf.firehose.config.RedisSinkConfig; -import io.odpf.firehose.config.enums.RedisSinkTtlType; -import io.odpf.firehose.exception.ConfigurationException; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.mockito.Mock; - -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.initMocks; - -public class RedisTtlFactoryTest { - - @Mock - private RedisSinkConfig redisSinkConfig; - - @Rule - public ExpectedException expectedException = ExpectedException.none(); - - @Before - public void setup() { - initMocks(this); - when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DISABLE); - } - - @Test - public void shouldReturnNoTTLIfNothingGiven() { - RedisTtl redisTTL = RedisTTLFactory.getTTl(redisSinkConfig); - Assert.assertEquals(redisTTL.getClass(), NoRedisTtl.class); - } - - @Test - public void shouldReturnExactTimeTTL() { - when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.EXACT_TIME); - when(redisSinkConfig.getSinkRedisTtlValue()).thenReturn(100L); - RedisTtl redisTTL = RedisTTLFactory.getTTl(redisSinkConfig); - Assert.assertEquals(redisTTL.getClass(), ExactTimeTtl.class); - } - - @Test - public void shouldReturnDurationTTL() { - when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); - when(redisSinkConfig.getSinkRedisTtlValue()).thenReturn(100L); - RedisTtl redisTTL = RedisTTLFactory.getTTl(redisSinkConfig); - Assert.assertEquals(redisTTL.getClass(), DurationTtl.class); - } - - @Test - public void shouldThrowExceptionInCaseOfInvalidConfiguration() { - expectedException.expect(ConfigurationException.class); - expectedException.expectMessage("Provide a positive TTL value"); - - when(redisSinkConfig.getSinkRedisTtlType()).thenReturn(RedisSinkTtlType.DURATION); - when(redisSinkConfig.getSinkRedisTtlValue()).thenReturn(-1L); - RedisTTLFactory.getTTl(redisSinkConfig); - } -}