Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCLOG-1916] Adding timeout support for generator.generate() #112

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e6f1878
Resolving indentation
garrix-fan Aug 2, 2022
b5ef979
Changing modifier for logger
garrix-fan Aug 2, 2022
ee05caa
Adding timeout config and doc
garrix-fan Aug 2, 2022
b38ac7c
Added executor for timeout bound call to generator.generate()
garrix-fan Aug 2, 2022
98256f0
Added DatagenException
garrix-fan Aug 2, 2022
2271315
Renamed DatagenException to DatagenTaskException
garrix-fan Aug 2, 2022
1e0ee07
Changed modifier for DatagenTaskException
garrix-fan Aug 2, 2022
34d0e69
Removing unused variable in start()
garrix-fan Aug 2, 2022
f2847f2
Removing unused import
garrix-fan Aug 2, 2022
0c0f43a
Replacing DatagenTaskException with ConnectException
garrix-fan Aug 2, 2022
eaebfdc
Replacing fully qualified names with class name
garrix-fan Aug 2, 2022
e26d4e9
Minor refactoring
garrix-fan Aug 2, 2022
1b7592b
Removed unused methods
garrix-fan Aug 2, 2022
eb4660d
Removed unused imports
garrix-fan Aug 2, 2022
4faaaf5
Resolving checkstyle errors
garrix-fan Aug 2, 2022
0be2c79
Making configValues immmutable set
garrix-fan Aug 2, 2022
47d95e9
Minor refactoring
garrix-fan Aug 2, 2022
adb4e35
Added Unit Tests for timeout and stackoverflow error
garrix-fan Aug 2, 2022
d069b14
Added test resource files
garrix-fan Aug 2, 2022
9c3b0f1
Added newline
garrix-fan Aug 2, 2022
9610758
Added validator to generate.timeout
garrix-fan Aug 3, 2022
fe81918
Added null check for generate.timeout validator
garrix-fan Aug 3, 2022
9728f7e
Minor refactoring
garrix-fan Aug 3, 2022
b663c97
Added Unit Tests for generate.timeout config
garrix-fan Aug 3, 2022
148cfa5
Added shutdown for the executor
garrix-fan Aug 4, 2022
b99be5a
Cancelled future before throwing ConnectException
garrix-fan Aug 4, 2022
2d40594
Removing Exception from method signature
garrix-fan Aug 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class DatagenConnectorConfig extends AbstractConfig {
private static final String RANDOM_SEED_DOC = "Numeric seed for generating random data. "
+ "Two connectors started with the same seed will deterministically produce the same data. "
+ "Each task will generate different data than the other tasks in the same connector.";
private static final String GENERATE_TIMEOUT_CONF = "generate.timeout";
private static final String GENERATE_TIMEOUT_DOC = "Timeout in milliseconds for random message "
+ "generation";

public DatagenConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
Expand Down Expand Up @@ -90,7 +93,8 @@ public static ConfigDef conf() {
Importance.HIGH,
QUICKSTART_DOC
)
.define(RANDOM_SEED_CONF, Type.LONG, null, Importance.LOW, RANDOM_SEED_DOC);
.define(RANDOM_SEED_CONF, Type.LONG, null, Importance.LOW, RANDOM_SEED_DOC)
.define(GENERATE_TIMEOUT_CONF, Type.LONG, null, Importance.LOW, GENERATE_TIMEOUT_DOC);
}

public String getKafkaTopic() {
Expand Down Expand Up @@ -131,6 +135,10 @@ public String getSchemaString() {
return this.getString(SCHEMA_STRING_CONF);
}

public Long getGenerateTimeout() {
return this.getLong(GENERATE_TIMEOUT_CONF);
}

public Schema getSchema() {
String quickstart = getQuickstart();
if (quickstart != null && !quickstart.isEmpty()) {
Expand Down
65 changes: 37 additions & 28 deletions src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -43,14 +48,15 @@

public class DatagenTask extends SourceTask {

static final Logger log = LoggerFactory.getLogger(DatagenTask.class);
private static final Logger log = LoggerFactory.getLogger(DatagenTask.class);

private static final Schema DEFAULT_KEY_SCHEMA = Schema.OPTIONAL_STRING_SCHEMA;
public static final String TASK_ID = "task.id";
public static final String TASK_GENERATION = "task.generation";
public static final String CURRENT_ITERATION = "current.iteration";
public static final String RANDOM_SEED = "random.seed";

private final ExecutorService generateExecutor = Executors.newSingleThreadExecutor();

private DatagenConnectorConfig config;
private String topic;
Expand Down Expand Up @@ -156,7 +162,7 @@ public void start(Map<String, String> props) {
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
public List<SourceRecord> poll() throws ConnectException {

if (maxInterval > 0) {
try {
Expand All @@ -166,29 +172,8 @@ public List<SourceRecord> poll() throws InterruptedException {
return null;
}
}

final Object generatedObject = generator.generate();
if (!(generatedObject instanceof GenericRecord)) {
throw new RuntimeException(String.format(
"Expected Avro Random Generator to return instance of GenericRecord, found %s instead",
generatedObject.getClass().getName()
));
}
final GenericRecord randomAvroMessage = (GenericRecord) generatedObject;

final List<Object> genericRowValues = new ArrayList<>();
Copy link
Member Author

@garrix-fan garrix-fan Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this unused variable and computation

for (org.apache.avro.Schema.Field field : avroSchema.getFields()) {
final Object value = randomAvroMessage.get(field.name());
if (value instanceof Record) {
final Record record = (Record) value;
final Object ksqlValue = avroData.toConnectData(record.getSchema(), record).value();
Object optionValue = getOptionalValue(ksqlSchema.field(field.name()).schema(), ksqlValue);
genericRowValues.add(optionValue);
} else {
genericRowValues.add(value);
}
}

final GenericRecord randomAvroMessage = generateRecord();

// Key
SchemaAndValue key = new SchemaAndValue(DEFAULT_KEY_SCHEMA, null);
if (!schemaKeyField.isEmpty()) {
Expand Down Expand Up @@ -245,6 +230,30 @@ public List<SourceRecord> poll() throws InterruptedException {
return records;
}

private GenericRecord generateRecord() throws ConnectException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to throw explicitly ConnectException

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a runtime exception.

Future<Object> generatedObjectFuture = generateExecutor.submit(generator::generate);
Long timeout = config.getGenerateTimeout();
Object generatedObject;
try {
if (timeout == null) {
generatedObject = generatedObjectFuture.get();
} else {
generatedObject = generatedObjectFuture.get(timeout, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException | ExecutionException e) {
throw new ConnectException("Unable to generate random record", e);
} catch (TimeoutException e) {
throw new ConnectException("Record generation timed out", e);
}
if (!(generatedObject instanceof GenericRecord)) {
throw new ConnectException(String.format(
"Expected Avro Random Generator to return instance of GenericRecord, found %s instead",
generatedObject.getClass().getName()
));
}
return (GenericRecord) generatedObject;
}

@Override
public void stop() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown executor ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Missed this

}
Expand Down Expand Up @@ -298,8 +307,8 @@ private Object getOptionalValue(
case ARRAY:
final List<?> list = (List<?>) value;
return list.stream()
.map(listItem -> getOptionalValue(schema.valueSchema(), listItem))
.collect(Collectors.toList());
.map(listItem -> getOptionalValue(schema.valueSchema(), listItem))
.collect(Collectors.toList());
case MAP:
final Map<?, ?> map = (Map<?, ?>) value;
return map.entrySet().stream()
Expand Down