Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-19458 Allow multiple caches in Kafka Connect IgniteSink #215

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ public class IgniteSinkTask extends SinkTask {
private static String igniteConfigFile;

/** Cache name. */
private static String cacheName;
private String cacheName;

/** Entry transformer. */
private static StreamSingleTupleExtractor<SinkRecord, Object, Object> extractor;
private StreamSingleTupleExtractor<SinkRecord, Object, Object> extractor;

/** Data streamer instance. */
private IgniteDataStreamer streamer;

/** {@inheritDoc} */
@Override public String version() {
Expand All @@ -61,24 +64,10 @@ public class IgniteSinkTask extends SinkTask {
* @param props Task properties.
*/
@Override public void start(Map<String, String> props) {
// Each task has the same parameters -- avoid setting more than once.
if (cacheName != null)
return;

cacheName = props.get(IgniteSinkConstants.CACHE_NAME);
igniteConfigFile = props.get(IgniteSinkConstants.CACHE_CFG_PATH);

if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))
StreamerContext.getStreamer().allowOverwrite(
Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)));

if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))
StreamerContext.getStreamer().perNodeBufferSize(
Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)));

if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))
StreamerContext.getStreamer().perNodeParallelOperations(
Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)));
streamer = IgniteContext.getIgnite().dataStreamer(cacheName);

if (props.containsKey(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS)) {
String transformerCls = props.get(IgniteSinkConstants.SINGLE_TUPLE_EXTRACTOR_CLASS);
Expand All @@ -96,6 +85,18 @@ public class IgniteSinkTask extends SinkTask {
}
}

if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))
streamer.allowOverwrite(
Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)));

if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))
streamer.perNodeBufferSize(
Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)));

if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))
streamer.perNodeParallelOperations(
Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)));

stopped = false;
}

Expand All @@ -111,11 +112,11 @@ public class IgniteSinkTask extends SinkTask {
// Data is flushed asynchronously when CACHE_PER_NODE_DATA_SIZE is reached.
if (extractor != null) {
Map.Entry<Object, Object> entry = extractor.extract(record);
StreamerContext.getStreamer().addData(entry.getKey(), entry.getValue());
streamer.addData(entry.getKey(), entry.getValue());
}
else {
if (record.key() != null) {
StreamerContext.getStreamer().addData(record.key(), record.value());
streamer.addData(record.key(), record.value());
}
else {
log.error("Failed to stream a record with null key!");
Expand All @@ -139,7 +140,7 @@ public class IgniteSinkTask extends SinkTask {
if (stopped)
return;

StreamerContext.getStreamer().flush();
streamer.flush();
}

/**
Expand All @@ -151,7 +152,7 @@ public class IgniteSinkTask extends SinkTask {

stopped = true;

StreamerContext.getIgnite().close();
IgniteContext.getIgnite().close();
}

/**
Expand All @@ -161,25 +162,20 @@ public class IgniteSinkTask extends SinkTask {
*/
protected static void setStopped(boolean stopped) {
IgniteSinkTask.stopped = stopped;

extractor = null;
}

/**
* Streamer context initializing grid and data streamer instances on demand.
* Ignite context initializing grid instance on demand.
*/
public static class StreamerContext {
public static class IgniteContext {
/** Constructor. */
private StreamerContext() {
private IgniteContext() {
}

/** Instance holder. */
private static class Holder {
/** */
private static final Ignite IGNITE = Ignition.start(igniteConfigFile);

/** */
private static final IgniteDataStreamer STREAMER = IGNITE.dataStreamer(cacheName);
}

/**
Expand All @@ -190,14 +186,5 @@ private static class Holder {
public static Ignite getIgnite() {
return Holder.IGNITE;
}

/**
* Obtains data streamer instance.
*
* @return Data streamer instance.
*/
public static IgniteDataStreamer getStreamer() {
return Holder.STREAMER;
}
}
}