-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CCLOG-1916] Adding timeout support for generator.generate() #112
base: master
Are you sure you want to change the base?
Changes from all commits
e6f1878
b5ef979
ee05caa
b38ac7c
98256f0
2271315
1e0ee07
34d0e69
f2847f2
0c0f43a
eaebfdc
e26d4e9
1b7592b
eb4660d
4faaaf5
0be2c79
47d95e9
adb4e35
d069b14
9c3b0f1
9610758
fe81918
9728f7e
b663c97
148cfa5
b99be5a
2d40594
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,6 +49,9 @@ public class DatagenConnectorConfig extends AbstractConfig { | |
private static final String RANDOM_SEED_DOC = "Numeric seed for generating random data. " | ||
+ "Two connectors started with the same seed will deterministically produce the same data. " | ||
+ "Each task will generate different data than the other tasks in the same connector."; | ||
public static final String GENERATE_TIMEOUT_CONF = "generate.timeout"; | ||
private static final String GENERATE_TIMEOUT_DOC = "Timeout in milliseconds for random message " | ||
+ "generation. This timeout can be configured for upto 1 minute, i.e 60000ms"; | ||
|
||
public DatagenConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { | ||
super(config, parsedConfig); | ||
|
@@ -90,7 +93,17 @@ public static ConfigDef conf() { | |
Importance.HIGH, | ||
QUICKSTART_DOC | ||
) | ||
.define(RANDOM_SEED_CONF, Type.LONG, null, Importance.LOW, RANDOM_SEED_DOC); | ||
.define(RANDOM_SEED_CONF, | ||
Type.LONG, | ||
null, | ||
Importance.LOW, | ||
RANDOM_SEED_DOC) | ||
.define(GENERATE_TIMEOUT_CONF, | ||
Type.LONG, | ||
null, | ||
new GenerateTimeoutValidator(), | ||
Importance.LOW, | ||
GENERATE_TIMEOUT_DOC); | ||
} | ||
|
||
public String getKafkaTopic() { | ||
|
@@ -131,6 +144,10 @@ public String getSchemaString() { | |
return this.getString(SCHEMA_STRING_CONF); | ||
} | ||
|
||
public Long getGenerateTimeout() { | ||
return this.getLong(GENERATE_TIMEOUT_CONF); | ||
} | ||
|
||
public Schema getSchema() { | ||
String quickstart = getQuickstart(); | ||
if (quickstart != null && !quickstart.isEmpty()) { | ||
|
@@ -163,11 +180,11 @@ public void ensureValid(String name, Object value) { | |
if (((String) value).isEmpty()) { | ||
return; | ||
} | ||
if (!Quickstart.configValues.contains(((String) value).toLowerCase())) { | ||
if (!Quickstart.configValues().contains(((String) value).toLowerCase())) { | ||
throw new ConfigException(String.format( | ||
"%s must be one out of %s", | ||
name, | ||
String.join(",", DatagenTask.Quickstart.configValues) | ||
String.join(",", DatagenTask.Quickstart.configValues()) | ||
)); | ||
} | ||
} | ||
|
@@ -194,5 +211,20 @@ public void ensureValid(String name, Object value) { | |
ConfigUtils.getSchemaFromSchemaFileName((String) value); | ||
} | ||
} | ||
|
||
private static class GenerateTimeoutValidator implements Validator { | ||
|
||
@Override | ||
public void ensureValid(String name, Object value) { | ||
if (value == null) { | ||
return; | ||
} | ||
long longValue = (Long) value; | ||
if (longValue > 0 && longValue <= 60000L) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 1 minute sounds like a reasonable limit, but should we allow a bigger range than this? we can see what the average time we generate full payload on the existing quickstart schemas |
||
return; | ||
} | ||
throw new ConfigException(name + " must be in the range [1, 60000] ms"); | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,24 +16,26 @@ | |
|
||
package io.confluent.kafka.connect.datagen; | ||
|
||
import com.google.common.collect.ImmutableSet; | ||
import io.confluent.avro.random.generator.Generator; | ||
import io.confluent.connect.avro.AvroData; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Random; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import org.apache.avro.generic.GenericData.Record; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import org.apache.avro.generic.GenericRecord; | ||
import org.apache.kafka.connect.data.Field; | ||
import org.apache.kafka.connect.data.Schema; | ||
import org.apache.kafka.connect.data.SchemaAndValue; | ||
import org.apache.kafka.connect.data.SchemaBuilder; | ||
import org.apache.kafka.connect.data.Struct; | ||
import org.apache.kafka.connect.errors.ConnectException; | ||
import org.apache.kafka.connect.header.ConnectHeaders; | ||
import org.apache.kafka.connect.source.SourceRecord; | ||
|
@@ -43,14 +45,15 @@ | |
|
||
public class DatagenTask extends SourceTask { | ||
|
||
static final Logger log = LoggerFactory.getLogger(DatagenTask.class); | ||
private static final Logger log = LoggerFactory.getLogger(DatagenTask.class); | ||
|
||
private static final Schema DEFAULT_KEY_SCHEMA = Schema.OPTIONAL_STRING_SCHEMA; | ||
public static final String TASK_ID = "task.id"; | ||
public static final String TASK_GENERATION = "task.generation"; | ||
public static final String CURRENT_ITERATION = "current.iteration"; | ||
public static final String RANDOM_SEED = "random.seed"; | ||
|
||
private final ExecutorService generateExecutor = Executors.newSingleThreadExecutor(); | ||
|
||
private DatagenConnectorConfig config; | ||
private String topic; | ||
|
@@ -60,7 +63,7 @@ public class DatagenTask extends SourceTask { | |
private String schemaKeyField; | ||
private Generator generator; | ||
private org.apache.avro.Schema avroSchema; | ||
private org.apache.kafka.connect.data.Schema ksqlSchema; | ||
private Schema ksqlSchema; | ||
private AvroData avroData; | ||
private int taskId; | ||
private Map<String, Object> sourcePartition; | ||
|
@@ -84,12 +87,15 @@ protected enum Quickstart { | |
STORES("stores.avro", "store_id"), | ||
CREDIT_CARDS("credit_cards.avro", "card_id"); | ||
|
||
static final Set<String> configValues = new HashSet<>(); | ||
private static final Set<String> configValues; | ||
|
||
static { | ||
for (Quickstart q : Quickstart.values()) { | ||
configValues.add(q.name().toLowerCase()); | ||
} | ||
ImmutableSet.Builder<String> immutableSetBuilder = ImmutableSet.builder(); | ||
Arrays.stream(Quickstart.values()) | ||
.map(Quickstart::name) | ||
.map(String::toLowerCase) | ||
.forEach(immutableSetBuilder::add); | ||
configValues = immutableSetBuilder.build(); | ||
} | ||
|
||
private final String schemaFilename; | ||
|
@@ -100,6 +106,10 @@ protected enum Quickstart { | |
this.keyName = keyName; | ||
} | ||
|
||
public static Set<String> configValues() { | ||
return configValues; | ||
} | ||
|
||
public String getSchemaFilename() { | ||
return schemaFilename; | ||
} | ||
|
@@ -156,7 +166,7 @@ public void start(Map<String, String> props) { | |
} | ||
|
||
@Override | ||
public List<SourceRecord> poll() throws InterruptedException { | ||
public List<SourceRecord> poll() { | ||
|
||
if (maxInterval > 0) { | ||
try { | ||
|
@@ -166,29 +176,8 @@ public List<SourceRecord> poll() throws InterruptedException { | |
return null; | ||
} | ||
} | ||
|
||
final Object generatedObject = generator.generate(); | ||
if (!(generatedObject instanceof GenericRecord)) { | ||
throw new RuntimeException(String.format( | ||
"Expected Avro Random Generator to return instance of GenericRecord, found %s instead", | ||
generatedObject.getClass().getName() | ||
)); | ||
} | ||
final GenericRecord randomAvroMessage = (GenericRecord) generatedObject; | ||
|
||
final List<Object> genericRowValues = new ArrayList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing this unused variable and computation |
||
for (org.apache.avro.Schema.Field field : avroSchema.getFields()) { | ||
final Object value = randomAvroMessage.get(field.name()); | ||
if (value instanceof Record) { | ||
final Record record = (Record) value; | ||
final Object ksqlValue = avroData.toConnectData(record.getSchema(), record).value(); | ||
Object optionValue = getOptionalValue(ksqlSchema.field(field.name()).schema(), ksqlValue); | ||
genericRowValues.add(optionValue); | ||
} else { | ||
genericRowValues.add(value); | ||
} | ||
} | ||
|
||
final GenericRecord randomAvroMessage = generateRecord(); | ||
|
||
// Key | ||
SchemaAndValue key = new SchemaAndValue(DEFAULT_KEY_SCHEMA, null); | ||
if (!schemaKeyField.isEmpty()) { | ||
|
@@ -199,7 +188,7 @@ public List<SourceRecord> poll() throws InterruptedException { | |
} | ||
|
||
// Value | ||
final org.apache.kafka.connect.data.Schema messageSchema = avroData.toConnectSchema(avroSchema); | ||
final Schema messageSchema = avroData.toConnectSchema(avroSchema); | ||
final Object messageValue = avroData.toConnectData(avroSchema, randomAvroMessage).value(); | ||
|
||
if (maxRecords > 0 && count >= maxRecords) { | ||
|
@@ -245,83 +234,34 @@ public List<SourceRecord> poll() throws InterruptedException { | |
return records; | ||
} | ||
|
||
@Override | ||
public void stop() { | ||
} | ||
|
||
private org.apache.kafka.connect.data.Schema getOptionalSchema( | ||
final org.apache.kafka.connect.data.Schema schema | ||
) { | ||
switch (schema.type()) { | ||
case BOOLEAN: | ||
return org.apache.kafka.connect.data.Schema.OPTIONAL_BOOLEAN_SCHEMA; | ||
case INT32: | ||
return org.apache.kafka.connect.data.Schema.OPTIONAL_INT32_SCHEMA; | ||
case INT64: | ||
return org.apache.kafka.connect.data.Schema.OPTIONAL_INT64_SCHEMA; | ||
case FLOAT64: | ||
return org.apache.kafka.connect.data.Schema.OPTIONAL_FLOAT64_SCHEMA; | ||
case STRING: | ||
return org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA; | ||
case ARRAY: | ||
return SchemaBuilder.array(getOptionalSchema(schema.valueSchema())).optional().build(); | ||
case MAP: | ||
return SchemaBuilder.map( | ||
getOptionalSchema(schema.keySchema()), | ||
getOptionalSchema(schema.valueSchema()) | ||
).optional().build(); | ||
case STRUCT: | ||
final SchemaBuilder schemaBuilder = SchemaBuilder.struct(); | ||
for (Field field : schema.fields()) { | ||
schemaBuilder.field( | ||
field.name(), | ||
getOptionalSchema(field.schema()) | ||
); | ||
} | ||
return schemaBuilder.optional().build(); | ||
default: | ||
throw new ConnectException("Unsupported type: " + schema); | ||
private GenericRecord generateRecord() { | ||
Future<Object> generatedObjectFuture = generateExecutor.submit(generator::generate); | ||
Long timeout = config.getGenerateTimeout(); | ||
Object generatedObject; | ||
try { | ||
if (timeout == null) { | ||
generatedObject = generatedObjectFuture.get(); | ||
} else { | ||
generatedObject = generatedObjectFuture.get(timeout, TimeUnit.MILLISECONDS); | ||
} | ||
} catch (InterruptedException | ExecutionException e) { | ||
generatedObjectFuture.cancel(true); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both |
||
throw new ConnectException("Unable to generate random record", e); | ||
} catch (TimeoutException e) { | ||
generatedObjectFuture.cancel(true); | ||
throw new ConnectException("Record generation timed out", e); | ||
} | ||
if (!(generatedObject instanceof GenericRecord)) { | ||
throw new ConnectException(String.format( | ||
"Expected Avro Random Generator to return instance of GenericRecord, found %s instead", | ||
generatedObject.getClass().getName() | ||
)); | ||
} | ||
return (GenericRecord) generatedObject; | ||
} | ||
|
||
private Object getOptionalValue( | ||
final org.apache.kafka.connect.data.Schema schema, | ||
final Object value | ||
) { | ||
switch (schema.type()) { | ||
case BOOLEAN: | ||
case INT32: | ||
case INT64: | ||
case FLOAT64: | ||
case STRING: | ||
return value; | ||
case ARRAY: | ||
final List<?> list = (List<?>) value; | ||
return list.stream() | ||
.map(listItem -> getOptionalValue(schema.valueSchema(), listItem)) | ||
.collect(Collectors.toList()); | ||
case MAP: | ||
final Map<?, ?> map = (Map<?, ?>) value; | ||
return map.entrySet().stream() | ||
.collect(Collectors.toMap( | ||
k -> getOptionalValue(schema.keySchema(), k), | ||
v -> getOptionalValue(schema.valueSchema(), v) | ||
)); | ||
case STRUCT: | ||
final Struct struct = (Struct) value; | ||
final Struct optionalStruct = new Struct(getOptionalSchema(schema)); | ||
for (Field field : schema.fields()) { | ||
optionalStruct.put( | ||
field.name(), | ||
getOptionalValue( | ||
field.schema(), | ||
struct.get(field.name()) | ||
) | ||
); | ||
} | ||
return optionalStruct; | ||
default: | ||
throw new ConnectException("Invalid value schema: " + schema + ", value = " + value); | ||
} | ||
@Override | ||
public void stop() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shutdown executor ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! Missed this |
||
generateExecutor.shutdown(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we define a default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For backward compatibility of already deployed connectors which don't use this config value, its better to leave default as null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we use
ConfigDef.NO_DEFAULT_VALUE
here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a default value, say
1000
, it will get applied to already deployed connectors on upgrades just fine.That implies that the config has no default value and must be specified, which would actually be backward incompatible and existing connectors would fail on upgrades.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what if someone has a regex which takes more time than this default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, let's go ahead with null default value in that case! for existing connectors, we can retain existing behavior, unless the config is explicitly specified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@garrix-fan that makes sense, I was just trying to say that we can't use no default value -
null
is fine.