-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CCLOG-1916] Adding timeout support for generator.generate() #112
base: master
Are you sure you want to change the base?
Conversation
} | ||
final GenericRecord randomAvroMessage = (GenericRecord) generatedObject; | ||
|
||
final List<Object> genericRowValues = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this unused variable and computation
275cb6a
to
b663c97
Compare
Importance.LOW, | ||
RANDOM_SEED_DOC) | ||
.define(GENERATE_TIMEOUT_CONF, | ||
Type.LONG, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we define a default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For backward compatibility of already deployed connectors which don't use this config value, its better to leave default as null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we use ConfigDef.NO_DEFAULT_VALUE
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For backward compatibility of already deployed connectors which don't use this config value, its better to leave default as null.
If there is a default value, say 1000
, it will get applied to already deployed connectors on upgrades just fine.
Why don't we use ConfigDef.NO_DEFAULT_VALUE here?
That implies that the config has no default value and must be specified, which would actually be backward incompatible and existing connectors would fail on upgrades.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a default value, say
1000
, it will get applied to already deployed connectors on upgrades just fine.
But what if someone has a regex which takes more time than this default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, let's go ahead with null default value in that case! for existing connectors, we can retain existing behavior, unless the config is explicitly specified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@garrix-fan that makes sense, I was just trying to say that we can't use no default value - null
is fine.
throw new ConnectException("Invalid value schema: " + schema + ", value = " + value); | ||
} | ||
@Override | ||
public void stop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shutdown executor ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Missed this
Importance.LOW, | ||
RANDOM_SEED_DOC) | ||
.define(GENERATE_TIMEOUT_CONF, | ||
Type.LONG, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For backward compatibility of already deployed connectors which don't use this config value, its better to leave default as null.
Importance.LOW, | ||
RANDOM_SEED_DOC) | ||
.define(GENERATE_TIMEOUT_CONF, | ||
Type.LONG, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we use ConfigDef.NO_DEFAULT_VALUE
here?
return; | ||
} | ||
long longValue = (Long) value; | ||
if (longValue > 0 && longValue <= 60000L) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 minute sounds like a reasonable limit, but should we allow a bigger range than this? we can see what the average time we generate full payload on the existing quickstart schemas
return schemaBuilder.optional().build(); | ||
default: | ||
throw new ConnectException("Unsupported type: " + schema); | ||
private GenericRecord generateRecord() throws ConnectException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to throw explicitly ConnectException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its a runtime exception.
generatedObject = generatedObjectFuture.get(timeout, TimeUnit.MILLISECONDS); | ||
} | ||
} catch (InterruptedException | ExecutionException e) { | ||
generatedObjectFuture.cancel(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both future.cancel(true)
as well as generatorExecutor.shutDown()
won't be able to stop or bring to halt, the thread within the executor service. For that to happen gracefully, we might need to add a logic which checks Thread.currentThread().isInterrupted()
from within the generator class.
https://stackoverflow.com/a/45107962/3909043
Problem
Avro random library 'generator.generate()' is a blocking call in task
poll()
method of datagen source connector. Practically, it is possible to create a regex which can make this method run indefinitely.Solution
Idea is to add a timeout for the same post which the task will fail with an exception. Timeout would be a parameter which can be configured.
Does this solution apply anywhere else?
If yes, where?
NA
Test Strategy
Testing done:
Release Plan