-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CCLOG-1916] Adding timeout support for generator.generate() #112
base: master
Are you sure you want to change the base?
Changes from 10 commits
e6f1878
b5ef979
ee05caa
b38ac7c
98256f0
2271315
1e0ee07
34d0e69
f2847f2
0c0f43a
eaebfdc
e26d4e9
1b7592b
eb4660d
4faaaf5
0be2c79
47d95e9
adb4e35
d069b14
9c3b0f1
9610758
fe81918
9728f7e
b663c97
148cfa5
b99be5a
2d40594
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,8 +26,13 @@ | |
import java.util.Map; | ||
import java.util.Random; | ||
import java.util.Set; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.stream.Collectors; | ||
import org.apache.avro.generic.GenericData.Record; | ||
import org.apache.avro.generic.GenericRecord; | ||
import org.apache.kafka.connect.data.Field; | ||
import org.apache.kafka.connect.data.Schema; | ||
|
@@ -43,14 +48,15 @@ | |
|
||
public class DatagenTask extends SourceTask { | ||
|
||
static final Logger log = LoggerFactory.getLogger(DatagenTask.class); | ||
private static final Logger log = LoggerFactory.getLogger(DatagenTask.class); | ||
|
||
private static final Schema DEFAULT_KEY_SCHEMA = Schema.OPTIONAL_STRING_SCHEMA; | ||
public static final String TASK_ID = "task.id"; | ||
public static final String TASK_GENERATION = "task.generation"; | ||
public static final String CURRENT_ITERATION = "current.iteration"; | ||
public static final String RANDOM_SEED = "random.seed"; | ||
|
||
private final ExecutorService generateExecutor = Executors.newSingleThreadExecutor(); | ||
|
||
private DatagenConnectorConfig config; | ||
private String topic; | ||
|
@@ -156,7 +162,7 @@ public void start(Map<String, String> props) { | |
} | ||
|
||
@Override | ||
public List<SourceRecord> poll() throws InterruptedException { | ||
public List<SourceRecord> poll() throws ConnectException { | ||
|
||
if (maxInterval > 0) { | ||
try { | ||
|
@@ -166,29 +172,8 @@ public List<SourceRecord> poll() throws InterruptedException { | |
return null; | ||
} | ||
} | ||
|
||
final Object generatedObject = generator.generate(); | ||
if (!(generatedObject instanceof GenericRecord)) { | ||
throw new RuntimeException(String.format( | ||
"Expected Avro Random Generator to return instance of GenericRecord, found %s instead", | ||
generatedObject.getClass().getName() | ||
)); | ||
} | ||
final GenericRecord randomAvroMessage = (GenericRecord) generatedObject; | ||
|
||
final List<Object> genericRowValues = new ArrayList<>(); | ||
for (org.apache.avro.Schema.Field field : avroSchema.getFields()) { | ||
final Object value = randomAvroMessage.get(field.name()); | ||
if (value instanceof Record) { | ||
final Record record = (Record) value; | ||
final Object ksqlValue = avroData.toConnectData(record.getSchema(), record).value(); | ||
Object optionValue = getOptionalValue(ksqlSchema.field(field.name()).schema(), ksqlValue); | ||
genericRowValues.add(optionValue); | ||
} else { | ||
genericRowValues.add(value); | ||
} | ||
} | ||
|
||
final GenericRecord randomAvroMessage = generateRecord(); | ||
|
||
// Key | ||
SchemaAndValue key = new SchemaAndValue(DEFAULT_KEY_SCHEMA, null); | ||
if (!schemaKeyField.isEmpty()) { | ||
|
@@ -245,6 +230,30 @@ public List<SourceRecord> poll() throws InterruptedException { | |
return records; | ||
} | ||
|
||
private GenericRecord generateRecord() throws ConnectException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't need to throw explicitly There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its a runtime exception. |
||
Future<Object> generatedObjectFuture = generateExecutor.submit(generator::generate); | ||
Long timeout = config.getGenerateTimeout(); | ||
Object generatedObject; | ||
try { | ||
if (timeout == null) { | ||
generatedObject = generatedObjectFuture.get(); | ||
} else { | ||
generatedObject = generatedObjectFuture.get(timeout, TimeUnit.MILLISECONDS); | ||
} | ||
} catch (InterruptedException | ExecutionException e) { | ||
throw new ConnectException("Unable to generate random record", e); | ||
} catch (TimeoutException e) { | ||
throw new ConnectException("Record generation timed out", e); | ||
} | ||
if (!(generatedObject instanceof GenericRecord)) { | ||
throw new ConnectException(String.format( | ||
"Expected Avro Random Generator to return instance of GenericRecord, found %s instead", | ||
generatedObject.getClass().getName() | ||
)); | ||
} | ||
return (GenericRecord) generatedObject; | ||
} | ||
|
||
@Override | ||
public void stop() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shutdown executor ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! Missed this |
||
} | ||
|
@@ -298,8 +307,8 @@ private Object getOptionalValue( | |
case ARRAY: | ||
final List<?> list = (List<?>) value; | ||
return list.stream() | ||
.map(listItem -> getOptionalValue(schema.valueSchema(), listItem)) | ||
.collect(Collectors.toList()); | ||
.map(listItem -> getOptionalValue(schema.valueSchema(), listItem)) | ||
.collect(Collectors.toList()); | ||
case MAP: | ||
final Map<?, ?> map = (Map<?, ?>) value; | ||
return map.entrySet().stream() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this unused variable and computation