Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
anton-vinogradov committed Jul 29, 2024
1 parent 49cec55 commit 7c13a1d
Show file tree
Hide file tree
Showing 6 changed files with 490 additions and 283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,28 +84,16 @@ public CdcEventsIgniteApplier(IgniteEx ignite, int maxBatchSize, IgniteLogger lo

/** {@inheritDoc} */
@Override protected KeyCacheObject toKey(CdcEvent evt) {
Object key = evt.key();

if (key instanceof KeyCacheObject)
return (KeyCacheObject)key;
else
return new KeyCacheObjectImpl(key, null, evt.partition());
return evt.keyCacheObject();
}

/** {@inheritDoc} */
@Override protected GridCacheDrInfo toValue(int cacheId, CdcEvent evt, GridCacheVersion ver) {
CacheObject cacheObj;

Object val = evt.value();

if (val instanceof CacheObject)
cacheObj = (CacheObject)val;
else
cacheObj = new CacheObjectImpl(val, null);
CacheObject val = evt.valueCacheObject();

return evt.expireTime() != EXPIRE_TIME_ETERNAL ?
new GridCacheDrExpirationInfo(cacheObj, ver, TTL_ETERNAL, evt.expireTime()) :
new GridCacheDrInfo(cacheObj, ver);
new GridCacheDrExpirationInfo(val, ver, TTL_ETERNAL, evt.expireTime()) :
new GridCacheDrInfo(val, ver);
}

/** @return Cache. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@

package org.apache.ignite.cdc.conflictresolve;

import java.util.Objects;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
Expand Down Expand Up @@ -84,11 +90,12 @@ public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveFi
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry,
Object prevStateMeta,
boolean atomicVerComparator
) {
GridCacheVersionConflictContext<K, V> res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);

boolean useNew = isUseNew(ctx, oldEntry, newEntry);
boolean useNew = isUseNew(ctx, oldEntry, newEntry, prevStateMeta);

if (useNew)
res.useNew();
Expand All @@ -110,7 +117,8 @@ public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveFi
protected <K, V> boolean isUseNew(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry
GridCacheVersionedEntryEx<K, V> newEntry,
Object prevStateMeta
) {
if (newEntry.dataCenterId() == clusterId) // Update made on the local cluster always win.
return true;
Expand All @@ -132,8 +140,8 @@ protected <K, V> boolean isUseNew(
}

if (conflictResolveFieldEnabled) {
Object oldVal = oldEntry.value(ctx);
Object newVal = newEntry.value(ctx);
Object oldVal = oldEntry.value(ctx);

if (oldVal != null && newVal != null) {
try {
Expand All @@ -146,6 +154,17 @@ protected <K, V> boolean isUseNew(
);
}
}

Object field = oldVal != null ? value(oldVal) : null;

if (Objects.equals(field, prevStateMeta)) // Previous value synchronized.
return true;
}
else {
GridCacheVersion oldVer = oldEntry.value(ctx) != null ? oldEntry.version() : null; // TODO null value version (entry vs row)

if (Objects.equals(oldVer, prevStateMeta)) // Previous value synchronized.
return true;
}

log.error("Conflict can't be resolved, " + (newEntry.value(ctx) == null ? "remove" : "update") + " ignored " +
Expand All @@ -155,6 +174,30 @@ protected <K, V> boolean isUseNew(
return false;
}

/**
* {@inheritDoc}
*/
@Override public Object previousStateMetadata(GridCacheEntryEx entry) {
if (conflictResolveFieldEnabled) {
CacheObjectValueContext ctx = entry.context().cacheObjectContext();
CacheObject val = entry.rawGet();

return val != null ?
value(CacheObjectUtils.unwrapBinaryIfNeeded(ctx, val, true, true, null)) :
null;
}
else {
try {
GridCacheVersion ver = entry.version();

return ver != null ? ver.conflictVersion() : null;
}
catch (GridCacheEntryRemovedException e) { // TODO
throw new RuntimeException(e);
}
}
}

/** @return Conflict resolve field value. */
protected Comparable value(Object val) {
return (val instanceof BinaryObject)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ public DebugCacheVersionConflictResolverImpl(byte clusterId, String conflictReso
@Override protected <K, V> boolean isUseNew(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry
GridCacheVersionedEntryEx<K, V> newEntry,
Object prevStateMeta
) {
boolean res = super.isUseNew(ctx, oldEntry, newEntry);
boolean res = super.isUseNew(ctx, oldEntry, newEntry, prevStateMeta);

Object oldVal = conflictResolveFieldEnabled ? oldEntry.value(ctx) : null;
Object newVal = conflictResolveFieldEnabled ? newEntry.value(ctx) : null;
Expand Down
Loading

0 comments on commit 7c13a1d

Please sign in to comment.