Skip to content
This repository has been archived by the owner on Oct 6, 2023. It is now read-only.

Commit

Permalink
add rmiUrl to debug message
Browse files Browse the repository at this point in the history
  • Loading branch information
Joerg-Schoemer committed May 6, 2022
1 parent 8276bf1 commit 2c44a85
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 196 deletions.
12 changes: 7 additions & 5 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/.classpath
/.project
/target
/.settings
/*.iml
.classpath
.project
target
.settings
*.iml

.idea
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

import net.sf.ehcache.CacheException;
import net.sf.ehcache.Ehcache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;

/**
* A provider of Peer RMI addresses based off manual configuration.
Expand All @@ -38,10 +38,15 @@
* @author Greg Luck
* @version $Id$
*/
public final class ManualRMICacheManagerPeerProvider extends RMICacheManagerPeerProvider {
public class ManualRMICacheManagerPeerProvider extends RMICacheManagerPeerProvider {

private static final Logger LOG = LoggerFactory.getLogger(ManualRMICacheManagerPeerProvider.class.getName());

/**
* Contains a RMI URLs of the form: "//" + hostName + ":" + port + "/" + cacheName;
*/
protected final Set<String> peerUrls = Collections.synchronizedSet(new HashSet<>());

/**
* Empty constructor.
*/
Expand All @@ -52,7 +57,7 @@ public ManualRMICacheManagerPeerProvider() {
/**
* {@inheritDoc}
*/
public final void init() {
public void init() {
//nothing to do here
}

Expand All @@ -65,68 +70,37 @@ public long getTimeForClusterToForm() {
return 0;
}

/**
* Register a new peer.
*
* @param rmiUrl
*/
public final synchronized void registerPeer(String rmiUrl) {
peerUrls.put(rmiUrl, new Date());
@Override
public synchronized void registerPeer(String rmiUrl) {
peerUrls.add(rmiUrl);
}


/**
* @return a list of {@link CachePeer} peers, excluding the local peer.
*/
public final synchronized List listRemoteCachePeers(Ehcache cache) throws CacheException {
List remoteCachePeers = new ArrayList();
List staleList = new ArrayList();
for (Iterator iterator = peerUrls.keySet().iterator(); iterator.hasNext();) {
String rmiUrl = (String) iterator.next();
@Override
public synchronized List<CachePeer> listRemoteCachePeers(Ehcache cache) throws CacheException {
List<CachePeer> remoteCachePeers = new ArrayList<>();
for (String rmiUrl : peerUrls) {
String rmiUrlCacheName = extractCacheName(rmiUrl);

if (!rmiUrlCacheName.equals(cache.getName())) {
continue;
}
Date date = (Date) peerUrls.get(rmiUrl);
if (!stale(date)) {
CachePeer cachePeer = null;
try {
cachePeer = lookupRemoteCachePeer(rmiUrl);
remoteCachePeers.add(cachePeer);
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Looking up rmiUrl " + rmiUrl + " through exception " + e.getMessage()
+ ". This may be normal if a node has gone offline. Or it may indicate network connectivity"
+ " difficulties", e);
}

try {
remoteCachePeers.add(lookupRemoteCachePeer(rmiUrl));
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Looking up rmiUrl {} through exception. This may be normal if a node has gone offline. Or it may indicate network connectivity difficulties",
rmiUrl, e);
}
} else {
LOG.debug("rmiUrl {} should never be stale for a manually configured cluster.", rmiUrl);
staleList.add(rmiUrl);
}

}

//Remove any stale remote peers. Must be done here to avoid concurrent modification exception.
for (int i = 0; i < staleList.size(); i++) {
String rmiUrl = (String) staleList.get(i);
peerUrls.remove(rmiUrl);
}
return remoteCachePeers;
}


/**
* Whether the entry should be considered stale.
* <p>
* Manual RMICacheManagerProviders use a static list of urls and are therefore never stale.
*
* @param date the date the entry was created
* @return true if stale
*/
protected final boolean stale(Date date) {
return false;
@Override
public void unregisterPeer(String rmiUrl) {
peerUrls.remove(rmiUrl);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
import net.sf.ehcache.CacheException;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Ehcache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetAddress;
import java.rmi.NotBoundException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.HashMap;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;

/**
* A peer provider which discovers peers using Multicast.
Expand All @@ -55,7 +56,7 @@
* @author Greg Luck
* @version $Id$
*/
public final class MulticastRMICacheManagerPeerProvider extends RMICacheManagerPeerProvider implements CacheManagerPeerProvider {
public class MulticastRMICacheManagerPeerProvider extends RMICacheManagerPeerProvider implements CacheManagerPeerProvider {

/**
* One tenth of a second, in ms
Expand All @@ -64,6 +65,10 @@ public final class MulticastRMICacheManagerPeerProvider extends RMICacheManagerP

private static final Logger LOG = LoggerFactory.getLogger(MulticastRMICacheManagerPeerProvider.class.getName());

/**
* Contains a RMI URLs of the form: "//" + hostName + ":" + port + "/" + cacheName as key
*/
protected final Map<String, CachePeerEntry> peerUrls = Collections.synchronizedMap(new HashMap<>());

private final MulticastKeepaliveHeartbeatReceiver heartBeatReceiver;
private final MulticastKeepaliveHeartbeatSender heartBeatSender;
Expand All @@ -79,18 +84,16 @@ public MulticastRMICacheManagerPeerProvider(CacheManager cacheManager, InetAddre
Integer groupMulticastPort, Integer timeToLive, InetAddress hostAddress) {
super(cacheManager);



heartBeatReceiver = new MulticastKeepaliveHeartbeatReceiver(this, groupMulticastAddress,
groupMulticastPort, hostAddress);
heartBeatSender = new MulticastKeepaliveHeartbeatSender(cacheManager, groupMulticastAddress,
groupMulticastPort, timeToLive, hostAddress);
groupMulticastPort, timeToLive, hostAddress);
}

/**
* {@inheritDoc}
*/
public final void init() throws CacheException {
public void init() throws CacheException {
try {
heartBeatReceiver.init();
heartBeatSender.init();
Expand All @@ -105,11 +108,11 @@ public final void init() throws CacheException {
* <p>
* This method is thread-safe. It relies on peerUrls being a synchronizedMap
*
* @param rmiUrl
* @param rmiUrl the URL to register
*/
public final void registerPeer(String rmiUrl) {
public void registerPeer(String rmiUrl) {
try {
CachePeerEntry cachePeerEntry = (CachePeerEntry) peerUrls.get(rmiUrl);
CachePeerEntry cachePeerEntry = peerUrls.get(rmiUrl);
if (cachePeerEntry == null || stale(cachePeerEntry.date)) {
//can take seconds if there is a problem
CachePeer cachePeer = lookupRemoteCachePeer(rmiUrl);
Expand All @@ -119,18 +122,12 @@ public final void registerPeer(String rmiUrl) {
} else {
cachePeerEntry.date = new Date();
}
} catch (IOException e) {
} catch (IOException | NotBoundException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to lookup remote cache peer for " + rmiUrl + ". Removing from peer list. Cause was: "
+ e.getMessage());
}
unregisterPeer(rmiUrl);
} catch (NotBoundException e) {
peerUrls.remove(rmiUrl);
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to lookup remote cache peer for " + rmiUrl + ". Removing from peer list. Cause was: "
+ e.getMessage());
}
} catch (Throwable t) {
LOG.error("Unable to lookup remote cache peer for " + rmiUrl
+ ". Cause was not due to an IOException or NotBoundException which will occur in normal operation:" +
Expand All @@ -141,48 +138,42 @@ public final void registerPeer(String rmiUrl) {
/**
* @return a list of {@link CachePeer} peers, excluding the local peer.
*/
public final synchronized List listRemoteCachePeers(Ehcache cache) throws CacheException {
List remoteCachePeers = new ArrayList();
List staleList = new ArrayList();
synchronized (peerUrls) {
for (Iterator iterator = peerUrls.keySet().iterator(); iterator.hasNext();) {
String rmiUrl = (String) iterator.next();
String rmiUrlCacheName = extractCacheName(rmiUrl);
try {
if (!rmiUrlCacheName.equals(cache.getName())) {
continue;
}
CachePeerEntry cachePeerEntry = (CachePeerEntry) peerUrls.get(rmiUrl);
Date date = cachePeerEntry.date;
if (!stale(date)) {
CachePeer cachePeer = cachePeerEntry.cachePeer;
remoteCachePeers.add(cachePeer);
} else {

LOG.debug("rmiUrl is stale. Either the remote peer is shutdown or the " +
public synchronized List<CachePeer> listRemoteCachePeers(Ehcache cache) throws CacheException {
List<CachePeer> remoteCachePeers = new ArrayList<>();
List<String> staleList = new ArrayList<>();

for (Map.Entry<String, CachePeerEntry> entry : peerUrls.entrySet()) {
String rmiUrl = entry.getKey();
String rmiUrlCacheName = extractCacheName(rmiUrl);
if (!rmiUrlCacheName.equals(cache.getName())) {
continue;
}
try {
CachePeerEntry cachePeerEntry = entry.getValue();
if (!stale(cachePeerEntry.date)) {
remoteCachePeers.add(cachePeerEntry.cachePeer);
} else {
LOG.debug("rmiUrl '{}' is stale. Either the remote peer is shutdown or the " +
"network connectivity has been interrupted. Will be removed from list of remote cache peers",
rmiUrl);
staleList.add(rmiUrl);
}
} catch (Exception exception) {
LOG.error(exception.getMessage(), exception);
throw new CacheException("Unable to list remote cache peers. Error was " + exception.getMessage());
rmiUrl);
staleList.add(rmiUrl);
}
}
//Must remove entries after we have finished iterating over them
for (int i = 0; i < staleList.size(); i++) {
String rmiUrl = (String) staleList.get(i);
peerUrls.remove(rmiUrl);
} catch (Exception exception) {
throw new CacheException("Unable to list remote cache peers.", exception);
}
}

// Must remove entries after we have finished iterating over them
staleList.forEach(peerUrls::remove);

return remoteCachePeers;
}


/**
* Shutdown the heartbeat
*/
public final void dispose() {
public void dispose() {
heartBeatSender.dispose();
heartBeatReceiver.dispose();
}
Expand Down Expand Up @@ -211,51 +202,42 @@ protected long getStaleTime() {
* @param date the date the entry was created
* @return true if stale
*/
protected final boolean stale(Date date) {
protected boolean stale(Date date) {
long now = System.currentTimeMillis();
return date.getTime() < (now - getStaleTime());
}

public void unregisterPeer(String rmiUrl) {
peerUrls.remove(rmiUrl);
}

/**
* Entry containing a looked up CachePeer and date
*/
protected static final class CachePeerEntry {
private static class CachePeerEntry {

private final CachePeer cachePeer;

/**
* last access date
*/
private Date date;

/**
* Constructor
*
* @param cachePeer the cache peer part of this entry
* @param date the date part of this entry
*/
public CachePeerEntry(CachePeer cachePeer, Date date) {
private CachePeerEntry(CachePeer cachePeer, Date date) {
this.cachePeer = cachePeer;
this.date = date;
}

/**
* @return the cache peer part of this entry
*/
public final CachePeer getCachePeer() {
return cachePeer;
}


/**
* @return the date part of this entry
*/
public final Date getDate() {
return date;
}

}

/**
* @return the MulticastKeepaliveHeartbeatReceiver
*/
@SuppressWarnings("unused")
public MulticastKeepaliveHeartbeatReceiver getHeartBeatReceiver() {
return heartBeatReceiver;
}
Expand Down
Loading

0 comments on commit 2c44a85

Please sign in to comment.