Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22530 CDC: Add regex filters for cache names #286

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,19 @@

package org.apache.ignite.cdc;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
Expand Down Expand Up @@ -67,12 +77,30 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
/** */
public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last applied event to destination cluster";

/** File with saved names of caches added by cache masks. */
private static final String SAVED_CACHES_FILE = "caches";

/** CDC directory path. */
private Path cdcDir;

/** Handle only primary entry flag. */
private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;

/** Cache names. */
private Set<String> caches;

/** Include regex templates for cache names. */
private Set<String> includeTemplates = new HashSet<>();

/** Compiled include regex patterns for cache names. */
private Set<Pattern> includeFilters;

/** Exclude regex templates for cache names. */
private Set<String> excludeTemplates = new HashSet<>();

/** Compiled exclude regex patterns for cache names. */
private Set<Pattern> excludeFilters;

/** Cache IDs. */
protected Set<Integer> cachesIds;

Expand All @@ -99,14 +127,28 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
protected IgniteLogger log;

/** {@inheritDoc} */
@Override public void start(MetricRegistry reg) {
@Override public void start(MetricRegistry reg, Path cdcDir) {
A.notEmpty(caches, "caches");

this.cdcDir = cdcDir;

cachesIds = caches.stream()
.mapToInt(CU::cacheId)
.boxed()
.collect(Collectors.toSet());

prepareRegexFilters();

try {
loadCaches().stream()
.filter(this::matchesFilters)
.map(CU::cacheId)
.forEach(cachesIds::add);
}
catch (IOException e) {
throw new IgniteException(e);
}

MetricRegistryImpl mreg = (MetricRegistryImpl)reg;

this.evtsCnt = mreg.longMetric(EVTS_SENT_CNT, EVTS_SENT_CNT_DESC);
Expand Down Expand Up @@ -144,10 +186,101 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
/** {@inheritDoc} */
@Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
cacheEvents.forEachRemaining(e -> {
// Just skip. Handle of cache events not supported.
matchWithRegexTemplates(e.configuration().getName());
});
}

/**
* Finds match between cache name and user's regex templates.
* If match found, adds this cache's id to id's list and saves cache name to file.
*
* @param cacheName Cache name.
*/
private void matchWithRegexTemplates(String cacheName) {
int cacheId = CU.cacheId(cacheName);

if (!cachesIds.contains(cacheId) && matchesFilters(cacheName)) {
cachesIds.add(cacheId);

try {
saveCache(cacheName);
}
catch (IOException e) {
throw new IgniteException(e);
}

if (log.isInfoEnabled())
log.info("Cache has been added to replication [cacheName=" + cacheName + "]");
}
}

/**
* Writes cache name to file
*
* @param cacheName Cache name.
*/
private void saveCache(String cacheName) throws IOException {
if (cdcDir != null) {
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);

String cn = cacheName + '\n';

Files.write(savedCachesPath, cn.getBytes(), StandardOpenOption.APPEND);
}
}

/**
* Loads saved caches from file.
*
* @return List of saved caches names.
*/
private List<String> loadCaches() throws IOException {
if (cdcDir != null) {
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);

if (Files.notExists(savedCachesPath)) {
Files.createFile(savedCachesPath);

if (log.isInfoEnabled())
log.info("Cache list created: " + savedCachesPath);
}

return Files.readAllLines(savedCachesPath);
}
return Collections.emptyList();
}

/**
* Compiles regex patterns from user templates.
*
* @throws PatternSyntaxException If the template's syntax is invalid
*/
private void prepareRegexFilters() {
includeFilters = includeTemplates.stream()
.map(Pattern::compile)
.collect(Collectors.toSet());

excludeFilters = excludeTemplates.stream()
.map(Pattern::compile)
.collect(Collectors.toSet());
}

/**
* Matches cache name with compiled regex patterns.
*
* @param cacheName Cache name.
* @return True if cache name match include patterns and don't match exclude patterns.
*/
private boolean matchesFilters(String cacheName) {
boolean matchesInclude = includeFilters.stream()
.anyMatch(pattern -> pattern.matcher(cacheName).matches());

boolean notMatchesExclude = excludeFilters.stream()
.noneMatch(pattern -> pattern.matcher(cacheName).matches());

return matchesInclude && notMatchesExclude;
}

/** {@inheritDoc} */
@Override public void onCacheDestroy(Iterator<Integer> caches) {
caches.forEachRemaining(e -> {
Expand Down Expand Up @@ -238,6 +371,30 @@ public AbstractIgniteCdcStreamer setCaches(Set<String> caches) {
return this;
}

/**
* Sets include regex patterns that participate in CDC.
*
* @param includeTemplates Include regex templates
* @return {@code this} for chaining.
*/
public AbstractIgniteCdcStreamer setIncludeTemplates(Set<String> includeTemplates) {
this.includeTemplates = includeTemplates;

return this;
}

/**
* Sets exclude regex patterns that participate in CDC.
*
* @param excludeTemplates Exclude regex templates
* @return {@code this} for chaining.
*/
public AbstractIgniteCdcStreamer setExcludeTemplates(Set<String> excludeTemplates) {
this.excludeTemplates = excludeTemplates;

return this;
}

/**
* Sets maximum batch size that will be applied to destination cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.ignite.cdc;

import java.nio.file.Path;

import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
Expand Down Expand Up @@ -61,8 +63,8 @@ public class IgniteToIgniteCdcStreamer extends AbstractIgniteCdcStreamer impleme
private volatile boolean alive = true;

/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
super.start(mreg);
@Override public void start(MetricRegistry mreg, Path cdcDir) {
super.start(mreg, cdcDir);

if (log.isInfoEnabled())
log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.ignite.cdc.conflictresolve;

import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;

import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
Expand Down Expand Up @@ -65,6 +68,12 @@ public class CacheVersionConflictResolverPluginProvider<C extends PluginConfigur
/** Custom conflict resolver. */
private CacheVersionConflictResolver resolver;

/** Include regex templates for cache names. */
private Set<String> includeTemplates = new HashSet<>();

/** Exclude regex templates for cache names. */
private Set<String> excludeTemplates = new HashSet<>();

/** Log. */
private IgniteLogger log;

Expand Down Expand Up @@ -98,7 +107,7 @@ public CacheVersionConflictResolverPluginProvider() {
@Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
String cacheName = ctx.igniteCacheConfiguration().getName();

if (caches.contains(cacheName)) {
if (caches.contains(cacheName) || matchesFilters(cacheName)) {
log.info("ConflictResolver provider set for cache [cacheName=" + cacheName + ']');

return provider;
Expand Down Expand Up @@ -144,6 +153,16 @@ public void setConflictResolver(CacheVersionConflictResolver resolver) {
this.resolver = resolver;
}

/** @param includeTemplates Include regex templates */
public void setIncludeTemplates(Set<String> includeTemplates) {
this.includeTemplates = includeTemplates;
}

/** @param excludeTemplates Exclude regex templates */
public void setExcludeTemplates(Set<String> excludeTemplates) {
this.excludeTemplates = excludeTemplates;
}

/** {@inheritDoc} */
@Override public void start(PluginContext ctx) {
((IgniteEx)ctx.grid()).context().cache().context().versions().dataCenterId(clusterId);
Expand Down Expand Up @@ -178,4 +197,21 @@ public void setConflictResolver(CacheVersionConflictResolver resolver) {
@Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) {
return null;
}

/**
* Match cache name with regex patterns.
*
* @param cacheName Cache name.
*/
private boolean matchesFilters(String cacheName) {
boolean matchesInclude = includeTemplates.stream()
.map(Pattern::compile)
.anyMatch(pattern -> pattern.matcher(cacheName).matches());

boolean notMatchesExclude = excludeTemplates.stream()
.map(Pattern::compile)
.noneMatch(pattern -> pattern.matcher(cacheName).matches());

return matchesInclude && notMatchesExclude;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ protected void runAppliers() {
caches,
metaUpdr,
stopped,
metrics
metrics,
this
);

addAndStart("applier-thread-" + cntr++, applier);
Expand Down Expand Up @@ -252,6 +253,13 @@ private <T extends AutoCloseable & Runnable> void addAndStart(String threadName,
/** Checks that configured caches exist in a destination cluster. */
protected abstract void checkCaches(Collection<String> caches);

/**
* Get cache names from client.
*
* @return Cache names.
* */
protected abstract Collection<String> getCaches();

/** */
private void ackAsciiLogo(IgniteLogger log) {
String ver = "ver. " + ACK_VER_STR;
Expand Down
Loading