Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCLOG-1916] Adding timeout support for generator.generate() #112

Open
wants to merge 27 commits into
base: master
Choose a base branch
from

Conversation

garrix-fan
Copy link
Member

@garrix-fan garrix-fan commented Aug 2, 2022

Problem

Avro random library 'generator.generate()' is a blocking call in task poll() method of datagen source connector. Practically, it is possible to create a regex which can make this method run indefinitely.

Solution

Idea is to add a timeout for the same post which the task will fail with an exception. Timeout would be a parameter which can be configured.

Does this solution apply anywhere else?
  • yes
  • no
If yes, where?

NA

Test Strategy

Testing done:
  • Unit tests
  • Integration tests
  • System tests
  • Manual tests

Release Plan

}
final GenericRecord randomAvroMessage = (GenericRecord) generatedObject;

final List<Object> genericRowValues = new ArrayList<>();
Copy link
Member Author

@garrix-fan garrix-fan Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this unused variable and computation

@garrix-fan garrix-fan marked this pull request as ready for review August 2, 2022 09:48
@garrix-fan garrix-fan requested a review from a team as a code owner August 2, 2022 09:48
@garrix-fan garrix-fan requested a review from a team August 2, 2022 09:48
@garrix-fan garrix-fan changed the title Adding timeout support for generator.generate() [CCLOG-1916] Adding timeout support for generator.generate() Aug 2, 2022
@garrix-fan garrix-fan force-pushed the CCLOG-1916-add-timeout-support branch from 275cb6a to b663c97 Compare August 3, 2022 05:44
Importance.LOW,
RANDOM_SEED_DOC)
.define(GENERATE_TIMEOUT_CONF,
Type.LONG,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we define a default value?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backward compatibility of already deployed connectors which don't use this config value, its better to leave default as null.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use ConfigDef.NO_DEFAULT_VALUE here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backward compatibility of already deployed connectors which don't use this config value, its better to leave default as null.

If there is a default value, say 1000, it will get applied to already deployed connectors on upgrades just fine.

Why don't we use ConfigDef.NO_DEFAULT_VALUE here?

That implies that the config has no default value and must be specified, which would actually be backward incompatible and existing connectors would fail on upgrades.

Copy link
Member Author

@garrix-fan garrix-fan Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a default value, say 1000, it will get applied to already deployed connectors on upgrades just fine.

But what if someone has a regex which takes more time than this default value?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, let's go ahead with null default value in that case! for existing connectors, we can retain existing behavior, unless the config is explicitly specified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garrix-fan that makes sense, I was just trying to say that we can't use no default value - null is fine.

throw new ConnectException("Invalid value schema: " + schema + ", value = " + value);
}
@Override
public void stop() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown executor ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Missed this

Importance.LOW,
RANDOM_SEED_DOC)
.define(GENERATE_TIMEOUT_CONF,
Type.LONG,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backward compatibility of already deployed connectors which don't use this config value, its better to leave default as null.

Importance.LOW,
RANDOM_SEED_DOC)
.define(GENERATE_TIMEOUT_CONF,
Type.LONG,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use ConfigDef.NO_DEFAULT_VALUE here?

return;
}
long longValue = (Long) value;
if (longValue > 0 && longValue <= 60000L) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 minute sounds like a reasonable limit, but should we allow a bigger range than this? we can see what the average time we generate full payload on the existing quickstart schemas

return schemaBuilder.optional().build();
default:
throw new ConnectException("Unsupported type: " + schema);
private GenericRecord generateRecord() throws ConnectException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to throw explicitly ConnectException

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a runtime exception.

generatedObject = generatedObjectFuture.get(timeout, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException | ExecutionException e) {
generatedObjectFuture.cancel(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both future.cancel(true) as well as generatorExecutor.shutDown() won't be able to stop or bring to halt, the thread within the executor service. For that to happen gracefully, we might need to add a logic which checks Thread.currentThread().isInterrupted() from within the generator class.
https://stackoverflow.com/a/45107962/3909043

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants