Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: static interval between messages #149

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

<groupId>io.confluent.kafka.connect</groupId>
<artifactId>kafka-connect-datagen</artifactId>
<version>0.7.0-SNAPSHOT</version>
<version>0.7.1-SNAPSHOT</version>
<packaging>jar</packaging>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public class DatagenConnectorConfig extends AbstractConfig {
private static final String KAFKA_TOPIC_DOC = "Topic to write to";
public static final String MAXINTERVAL_CONF = "max.interval";
private static final String MAXINTERVAL_DOC = "Max interval between messages (ms)";
private static final String STATICINTERVAL_CONF = "static.interval";
private static final String STATICINTERVAL_DOC = "Static interval between messages (ms), "
+ "when set ignores max.interval setting";
public static final String ITERATIONS_CONF = "iterations";
private static final String ITERATIONS_DOC = "Number of messages to send from each task, "
+ "or less than 1 for unlimited";
Expand Down Expand Up @@ -61,6 +64,7 @@ public static ConfigDef conf() {
return new ConfigDef()
.define(KAFKA_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_TOPIC_DOC)
.define(MAXINTERVAL_CONF, Type.LONG, 500L, Importance.HIGH, MAXINTERVAL_DOC)
.define(STATICINTERVAL_CONF, Type.LONG, 0L, Importance.HIGH, STATICINTERVAL_DOC)
.define(ITERATIONS_CONF, Type.INT, -1, Importance.HIGH, ITERATIONS_DOC)
.define(SCHEMA_STRING_CONF,
Type.STRING,
Expand Down Expand Up @@ -100,6 +104,10 @@ public Long getMaxInterval() {
return this.getLong(MAXINTERVAL_CONF);
}

public Long getStaticInterval() {
return this.getLong(STATICINTERVAL_CONF);
}

public Integer getIterations() {
return this.getInt(ITERATIONS_CONF);
}
Expand Down Expand Up @@ -194,4 +202,3 @@ public void ensureValid(String name, Object value) {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ public class DatagenTask extends SourceTask {
public static final String CURRENT_ITERATION = "current.iteration";
public static final String RANDOM_SEED = "random.seed";


private DatagenConnectorConfig config;
private String topic;
private long maxInterval;
private long staticInterval;
private long interval;
private int maxRecords;
private long count = 0L;
private String schemaKeyField;
Expand All @@ -70,6 +71,7 @@ public void start(Map<String, String> props) {
config = new DatagenConnectorConfig(props);
topic = config.getKafkaTopic();
maxInterval = config.getMaxInterval();
staticInterval = config.getStaticInterval();
maxRecords = config.getIterations();
schemaKeyField = config.getSchemaKeyfield();
taskGeneration = 0;
Expand Down Expand Up @@ -108,13 +110,16 @@ public void start(Map<String, String> props) {
@Override
public List<SourceRecord> poll() throws InterruptedException {

if (maxInterval > 0) {
try {
Thread.sleep((long) (maxInterval * Math.random()));
} catch (InterruptedException e) {
Thread.interrupted();
return null;
}
if (staticInterval > 0) {
interval = staticInterval;
} else if (maxInterval > 0) {
interval = (long) (maxInterval * Math.random());
}
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
Thread.interrupted();
return null;
}

final Object generatedObject = generator.generate();
Expand Down