Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCLOG-1916] Adding timeout support for generator.generate() #112

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e6f1878
Resolving indentation
garrix-fan Aug 2, 2022
b5ef979
Changing modifier for logger
garrix-fan Aug 2, 2022
ee05caa
Adding timeout config and doc
garrix-fan Aug 2, 2022
b38ac7c
Added executor for timeout bound call to generator.generate()
garrix-fan Aug 2, 2022
98256f0
Added DatagenException
garrix-fan Aug 2, 2022
2271315
Renamed DatagenException to DatagenTaskException
garrix-fan Aug 2, 2022
1e0ee07
Changed modifier for DatagenTaskException
garrix-fan Aug 2, 2022
34d0e69
Removing unused variable in start()
garrix-fan Aug 2, 2022
f2847f2
Removing unused import
garrix-fan Aug 2, 2022
0c0f43a
Replacing DatagenTaskException with ConnectException
garrix-fan Aug 2, 2022
eaebfdc
Replacing fully qualified names with class name
garrix-fan Aug 2, 2022
e26d4e9
Minor refactoring
garrix-fan Aug 2, 2022
1b7592b
Removed unused methods
garrix-fan Aug 2, 2022
eb4660d
Removed unused imports
garrix-fan Aug 2, 2022
4faaaf5
Resolving checkstyle errors
garrix-fan Aug 2, 2022
0be2c79
Making configValues immmutable set
garrix-fan Aug 2, 2022
47d95e9
Minor refactoring
garrix-fan Aug 2, 2022
adb4e35
Added Unit Tests for timeout and stackoverflow error
garrix-fan Aug 2, 2022
d069b14
Added test resource files
garrix-fan Aug 2, 2022
9c3b0f1
Added newline
garrix-fan Aug 2, 2022
9610758
Added validator to generate.timeout
garrix-fan Aug 3, 2022
fe81918
Added null check for generate.timeout validator
garrix-fan Aug 3, 2022
9728f7e
Minor refactoring
garrix-fan Aug 3, 2022
b663c97
Added Unit Tests for generate.timeout config
garrix-fan Aug 3, 2022
148cfa5
Added shutdown for the executor
garrix-fan Aug 4, 2022
b99be5a
Cancelled future before throwing ConnectException
garrix-fan Aug 4, 2022
2d40594
Removing Exception from method signature
garrix-fan Aug 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class DatagenConnectorConfig extends AbstractConfig {
private static final String RANDOM_SEED_DOC = "Numeric seed for generating random data. "
+ "Two connectors started with the same seed will deterministically produce the same data. "
+ "Each task will generate different data than the other tasks in the same connector.";
public static final String GENERATE_TIMEOUT_CONF = "generate.timeout";
private static final String GENERATE_TIMEOUT_DOC = "Timeout in milliseconds for random message "
+ "generation. This timeout can be configured for upto 1 minute, i.e 60000ms";

public DatagenConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
Expand Down Expand Up @@ -90,7 +93,17 @@ public static ConfigDef conf() {
Importance.HIGH,
QUICKSTART_DOC
)
.define(RANDOM_SEED_CONF, Type.LONG, null, Importance.LOW, RANDOM_SEED_DOC);
.define(RANDOM_SEED_CONF,
Type.LONG,
null,
Importance.LOW,
RANDOM_SEED_DOC)
.define(GENERATE_TIMEOUT_CONF,
Type.LONG,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we define a default value?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backward compatibility of already deployed connectors which don't use this config value, its better to leave default as null.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use ConfigDef.NO_DEFAULT_VALUE here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backward compatibility of already deployed connectors which don't use this config value, its better to leave default as null.

If there is a default value, say 1000, it will get applied to already deployed connectors on upgrades just fine.

Why don't we use ConfigDef.NO_DEFAULT_VALUE here?

That implies that the config has no default value and must be specified, which would actually be backward incompatible and existing connectors would fail on upgrades.

Copy link
Member Author

@garrix-fan garrix-fan Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a default value, say 1000, it will get applied to already deployed connectors on upgrades just fine.

But what if someone has a regex which takes more time than this default value?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, let's go ahead with null default value in that case! for existing connectors, we can retain existing behavior, unless the config is explicitly specified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garrix-fan that makes sense, I was just trying to say that we can't use no default value - null is fine.

null,
new GenerateTimeoutValidator(),
Importance.LOW,
GENERATE_TIMEOUT_DOC);
}

public String getKafkaTopic() {
Expand Down Expand Up @@ -131,6 +144,10 @@ public String getSchemaString() {
return this.getString(SCHEMA_STRING_CONF);
}

public Long getGenerateTimeout() {
return this.getLong(GENERATE_TIMEOUT_CONF);
}

public Schema getSchema() {
String quickstart = getQuickstart();
if (quickstart != null && !quickstart.isEmpty()) {
Expand Down Expand Up @@ -163,11 +180,11 @@ public void ensureValid(String name, Object value) {
if (((String) value).isEmpty()) {
return;
}
if (!Quickstart.configValues.contains(((String) value).toLowerCase())) {
if (!Quickstart.configValues().contains(((String) value).toLowerCase())) {
throw new ConfigException(String.format(
"%s must be one out of %s",
name,
String.join(",", DatagenTask.Quickstart.configValues)
String.join(",", DatagenTask.Quickstart.configValues())
));
}
}
Expand All @@ -194,5 +211,20 @@ public void ensureValid(String name, Object value) {
ConfigUtils.getSchemaFromSchemaFileName((String) value);
}
}

private static class GenerateTimeoutValidator implements Validator {

@Override
public void ensureValid(String name, Object value) {
if (value == null) {
return;
}
long longValue = (Long) value;
if (longValue > 0 && longValue <= 60000L) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 minute sounds like a reasonable limit, but should we allow a bigger range than this? we can see what the average time we generate full payload on the existing quickstart schemas

return;
}
throw new ConfigException(name + " must be in the range [1, 60000] ms");
}
}
}

164 changes: 52 additions & 112 deletions src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,26 @@

package io.confluent.kafka.connect.datagen;

import com.google.common.collect.ImmutableSet;
import io.confluent.avro.random.generator.Generator;
import io.confluent.connect.avro.AvroData;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericData.Record;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -43,14 +45,15 @@

public class DatagenTask extends SourceTask {

static final Logger log = LoggerFactory.getLogger(DatagenTask.class);
private static final Logger log = LoggerFactory.getLogger(DatagenTask.class);

private static final Schema DEFAULT_KEY_SCHEMA = Schema.OPTIONAL_STRING_SCHEMA;
public static final String TASK_ID = "task.id";
public static final String TASK_GENERATION = "task.generation";
public static final String CURRENT_ITERATION = "current.iteration";
public static final String RANDOM_SEED = "random.seed";

private final ExecutorService generateExecutor = Executors.newSingleThreadExecutor();

private DatagenConnectorConfig config;
private String topic;
Expand All @@ -60,7 +63,7 @@ public class DatagenTask extends SourceTask {
private String schemaKeyField;
private Generator generator;
private org.apache.avro.Schema avroSchema;
private org.apache.kafka.connect.data.Schema ksqlSchema;
private Schema ksqlSchema;
private AvroData avroData;
private int taskId;
private Map<String, Object> sourcePartition;
Expand All @@ -84,12 +87,15 @@ protected enum Quickstart {
STORES("stores.avro", "store_id"),
CREDIT_CARDS("credit_cards.avro", "card_id");

static final Set<String> configValues = new HashSet<>();
private static final Set<String> configValues;

static {
for (Quickstart q : Quickstart.values()) {
configValues.add(q.name().toLowerCase());
}
ImmutableSet.Builder<String> immutableSetBuilder = ImmutableSet.builder();
Arrays.stream(Quickstart.values())
.map(Quickstart::name)
.map(String::toLowerCase)
.forEach(immutableSetBuilder::add);
configValues = immutableSetBuilder.build();
}

private final String schemaFilename;
Expand All @@ -100,6 +106,10 @@ protected enum Quickstart {
this.keyName = keyName;
}

public static Set<String> configValues() {
return configValues;
}

public String getSchemaFilename() {
return schemaFilename;
}
Expand Down Expand Up @@ -156,7 +166,7 @@ public void start(Map<String, String> props) {
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
public List<SourceRecord> poll() {

if (maxInterval > 0) {
try {
Expand All @@ -166,29 +176,8 @@ public List<SourceRecord> poll() throws InterruptedException {
return null;
}
}

final Object generatedObject = generator.generate();
if (!(generatedObject instanceof GenericRecord)) {
throw new RuntimeException(String.format(
"Expected Avro Random Generator to return instance of GenericRecord, found %s instead",
generatedObject.getClass().getName()
));
}
final GenericRecord randomAvroMessage = (GenericRecord) generatedObject;

final List<Object> genericRowValues = new ArrayList<>();
Copy link
Member Author

@garrix-fan garrix-fan Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this unused variable and computation

for (org.apache.avro.Schema.Field field : avroSchema.getFields()) {
final Object value = randomAvroMessage.get(field.name());
if (value instanceof Record) {
final Record record = (Record) value;
final Object ksqlValue = avroData.toConnectData(record.getSchema(), record).value();
Object optionValue = getOptionalValue(ksqlSchema.field(field.name()).schema(), ksqlValue);
genericRowValues.add(optionValue);
} else {
genericRowValues.add(value);
}
}

final GenericRecord randomAvroMessage = generateRecord();

// Key
SchemaAndValue key = new SchemaAndValue(DEFAULT_KEY_SCHEMA, null);
if (!schemaKeyField.isEmpty()) {
Expand All @@ -199,7 +188,7 @@ public List<SourceRecord> poll() throws InterruptedException {
}

// Value
final org.apache.kafka.connect.data.Schema messageSchema = avroData.toConnectSchema(avroSchema);
final Schema messageSchema = avroData.toConnectSchema(avroSchema);
final Object messageValue = avroData.toConnectData(avroSchema, randomAvroMessage).value();

if (maxRecords > 0 && count >= maxRecords) {
Expand Down Expand Up @@ -245,83 +234,34 @@ public List<SourceRecord> poll() throws InterruptedException {
return records;
}

@Override
public void stop() {
}

private org.apache.kafka.connect.data.Schema getOptionalSchema(
final org.apache.kafka.connect.data.Schema schema
) {
switch (schema.type()) {
case BOOLEAN:
return org.apache.kafka.connect.data.Schema.OPTIONAL_BOOLEAN_SCHEMA;
case INT32:
return org.apache.kafka.connect.data.Schema.OPTIONAL_INT32_SCHEMA;
case INT64:
return org.apache.kafka.connect.data.Schema.OPTIONAL_INT64_SCHEMA;
case FLOAT64:
return org.apache.kafka.connect.data.Schema.OPTIONAL_FLOAT64_SCHEMA;
case STRING:
return org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA;
case ARRAY:
return SchemaBuilder.array(getOptionalSchema(schema.valueSchema())).optional().build();
case MAP:
return SchemaBuilder.map(
getOptionalSchema(schema.keySchema()),
getOptionalSchema(schema.valueSchema())
).optional().build();
case STRUCT:
final SchemaBuilder schemaBuilder = SchemaBuilder.struct();
for (Field field : schema.fields()) {
schemaBuilder.field(
field.name(),
getOptionalSchema(field.schema())
);
}
return schemaBuilder.optional().build();
default:
throw new ConnectException("Unsupported type: " + schema);
private GenericRecord generateRecord() {
Future<Object> generatedObjectFuture = generateExecutor.submit(generator::generate);
Long timeout = config.getGenerateTimeout();
Object generatedObject;
try {
if (timeout == null) {
generatedObject = generatedObjectFuture.get();
} else {
generatedObject = generatedObjectFuture.get(timeout, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException | ExecutionException e) {
generatedObjectFuture.cancel(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both future.cancel(true) as well as generatorExecutor.shutDown() won't be able to stop or bring to halt, the thread within the executor service. For that to happen gracefully, we might need to add a logic which checks Thread.currentThread().isInterrupted() from within the generator class.
https://stackoverflow.com/a/45107962/3909043

throw new ConnectException("Unable to generate random record", e);
} catch (TimeoutException e) {
generatedObjectFuture.cancel(true);
throw new ConnectException("Record generation timed out", e);
}
if (!(generatedObject instanceof GenericRecord)) {
throw new ConnectException(String.format(
"Expected Avro Random Generator to return instance of GenericRecord, found %s instead",
generatedObject.getClass().getName()
));
}
return (GenericRecord) generatedObject;
}

private Object getOptionalValue(
final org.apache.kafka.connect.data.Schema schema,
final Object value
) {
switch (schema.type()) {
case BOOLEAN:
case INT32:
case INT64:
case FLOAT64:
case STRING:
return value;
case ARRAY:
final List<?> list = (List<?>) value;
return list.stream()
.map(listItem -> getOptionalValue(schema.valueSchema(), listItem))
.collect(Collectors.toList());
case MAP:
final Map<?, ?> map = (Map<?, ?>) value;
return map.entrySet().stream()
.collect(Collectors.toMap(
k -> getOptionalValue(schema.keySchema(), k),
v -> getOptionalValue(schema.valueSchema(), v)
));
case STRUCT:
final Struct struct = (Struct) value;
final Struct optionalStruct = new Struct(getOptionalSchema(schema));
for (Field field : schema.fields()) {
optionalStruct.put(
field.name(),
getOptionalValue(
field.schema(),
struct.get(field.name())
)
);
}
return optionalStruct;
default:
throw new ConnectException("Invalid value schema: " + schema + ", value = " + value);
}
@Override
public void stop() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown executor ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Missed this

generateExecutor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,30 @@ public void shouldNotValidateSchemaKeyFieldWhenMultipleSchemaSourcesAreSet() {
);
}

@Test
public void shouldAllowSettingGenerateTimeoutInRange() {
clearSchemaSources();
config.put(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, "100");
Config validated = connector.validate(config);
assertThat(validated, hasNoValidationErrorsFor(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF));
}

@Test
public void shouldNotAllowSettingGenerateTimeoutNegative() {
clearSchemaSources();
config.put(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, "-1");
Config validated = connector.validate(config);
assertThat(validated, hasValidationError(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, 1));
}

@Test
public void shouldNotAllowSettingGenerateTimeoutOutOfRange() {
clearSchemaSources();
config.put(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, "70000");
Config validated = connector.validate(config);
assertThat(validated, hasValidationError(DatagenConnectorConfig.GENERATE_TIMEOUT_CONF, 1));
}

protected void assertTaskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = connector.taskConfigs(maxTasks);
assertEquals(maxTasks, taskConfigs.size());
Expand Down
Loading