Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
anton-vinogradov committed Aug 12, 2024
1 parent fe451d5 commit 91f60e7
Show file tree
Hide file tree
Showing 5 changed files with 529 additions and 320 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
Expand Down Expand Up @@ -84,28 +82,16 @@ public CdcEventsIgniteApplier(IgniteEx ignite, int maxBatchSize, IgniteLogger lo

/** {@inheritDoc} */
@Override protected KeyCacheObject toKey(CdcEvent evt) {
Object key = evt.key();

if (key instanceof KeyCacheObject)
return (KeyCacheObject)key;
else
return new KeyCacheObjectImpl(key, null, evt.partition());
return evt.keyCacheObject();
}

/** {@inheritDoc} */
@Override protected GridCacheDrInfo toValue(int cacheId, CdcEvent evt, GridCacheVersion ver) {
CacheObject cacheObj;

Object val = evt.value();

if (val instanceof CacheObject)
cacheObj = (CacheObject)val;
else
cacheObj = new CacheObjectImpl(val, null);
CacheObject val = evt.valueCacheObject();

return evt.expireTime() != EXPIRE_TIME_ETERNAL ?
new GridCacheDrExpirationInfo(cacheObj, ver, TTL_ETERNAL, evt.expireTime()) :
new GridCacheDrInfo(cacheObj, ver);
new GridCacheDrExpirationInfo(val, ver, TTL_ETERNAL, evt.expireTime()) :
new GridCacheDrInfo(val, ver);
}

/** @return Cache. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@

package org.apache.ignite.cdc.conflictresolve;

import java.util.Objects;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
Expand Down Expand Up @@ -88,11 +94,12 @@ public CacheVersionConflictResolverImpl(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry,
Object prevStateMeta,
boolean atomicVerComparator
) {
GridCacheVersionConflictContext<K, V> res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);

boolean useNew = isUseNew(ctx, oldEntry, newEntry);
boolean useNew = isUseNew(ctx, oldEntry, newEntry, prevStateMeta);

if (log.isDebugEnabled())
debugResolve(ctx, useNew, oldEntry, newEntry);
Expand All @@ -117,7 +124,8 @@ public CacheVersionConflictResolverImpl(
protected <K, V> boolean isUseNew(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry
GridCacheVersionedEntryEx<K, V> newEntry,
Object prevStateMeta
) {
if (newEntry.dataCenterId() == clusterId) // Update made on the local cluster always win.
return true;
Expand All @@ -139,8 +147,8 @@ protected <K, V> boolean isUseNew(
}

if (conflictResolveFieldEnabled) {
Object oldVal = oldEntry.value(ctx);
Object newVal = newEntry.value(ctx);
Object oldVal = oldEntry.value(ctx);

if (oldVal != null && newVal != null) {
try {
Expand All @@ -153,6 +161,17 @@ protected <K, V> boolean isUseNew(
);
}
}

Object field = oldVal != null ? value(oldVal) : null;

if (Objects.equals(field, prevStateMeta)) // Previous value synchronized.
return true;
}
else {
GridCacheVersion oldVer = oldEntry.value(ctx) != null ? oldEntry.version() : null; // TODO null value version (entry vs row)

if (Objects.equals(oldVer, prevStateMeta)) // Previous value synchronized.
return true;
}

log.error("Conflict can't be resolved, " + (newEntry.value(ctx) == null ? "remove" : "update") + " ignored " +
Expand All @@ -162,6 +181,30 @@ protected <K, V> boolean isUseNew(
return false;
}

/**
* {@inheritDoc}
*/
@Override public Object previousStateMetadata(GridCacheEntryEx entry) {
if (conflictResolveFieldEnabled) {
CacheObjectValueContext ctx = entry.context().cacheObjectContext();
CacheObject val = entry.rawGet();

return val != null ?
value(CacheObjectUtils.unwrapBinaryIfNeeded(ctx, val, true, true, null)) :
null;
}
else {
try {
GridCacheVersion ver = entry.version();

return ver != null ? ver.conflictVersion() : null;
}
catch (GridCacheEntryRemovedException e) { // TODO
throw new RuntimeException(e);
}
}
}

/** @return Conflict resolve field value. */
protected Comparable value(Object val) {
return (val instanceof BinaryObject)
Expand Down
Loading

0 comments on commit 91f60e7

Please sign in to comment.