diff --git a/ArgusCore/pom.xml b/ArgusCore/pom.xml index d82ac726d..9c9d0dc6d 100644 --- a/ArgusCore/pom.xml +++ b/ArgusCore/pom.xml @@ -189,7 +189,7 @@ org.apache.httpcomponents httpcore - 4.4 + 4.4.9 com.fasterxml.jackson.core @@ -202,7 +202,7 @@ com.fasterxml.jackson.core jackson-databind - + org.eclipse.persistence eclipselink @@ -262,7 +262,7 @@ org.apache.kafka kafka-clients - 0.8.2.1 + 0.8.2.1 org.scala-lang @@ -283,7 +283,7 @@ slf4j-log4j12 - 3.4.6 + 3.4.10 net.sf.jopt-simple @@ -341,7 +341,7 @@ org.apache.hbase hbase-client - 0.98.8-hadoop2 + 1.4.2 jersey-core @@ -390,7 +390,7 @@ org.apache.phoenix phoenix-core - 4.9.0-HBase-0.98 + 4.13.1-HBase-0.98 jersey-core @@ -422,7 +422,7 @@ org.apache.curator curator-framework - 2.10.0 + 2.11.1 org.apache.curator diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java index 5f8135662..cc0478aa6 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java @@ -54,7 +54,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.http.ConnectionReuseStrategy; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.HttpResponse; @@ -65,11 +67,13 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.conn.routing.HttpRoute; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,7 +110,7 @@ public class AbstractTSDBService extends DefaultService implements TSDBService { //~ Instance fields ****************************************************************************************************************************** private final ObjectMapper _mapper; protected final Logger _logger = LoggerFactory.getLogger(getClass()); - + private final String[] _writeEndpoints; protected final CloseableHttpClient _writeHttpClient; @@ -148,6 +152,8 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer Property.TSD_ENDPOINT_CONNECTION_TIMEOUT.getDefaultValue())); int socketTimeout = Integer.parseInt(config.getValue(Property.TSD_ENDPOINT_SOCKET_TIMEOUT.getName(), Property.TSD_ENDPOINT_SOCKET_TIMEOUT.getDefaultValue())); + int tsdbConnectionReuseCount=Integer.parseInt(config.getValue(Property.TSDB_READ_CONNECTION_REUSE_COUNT.getName(), + Property.TSDB_READ_CONNECTION_REUSE_COUNT.getDefaultValue())); _readEndPoints = Arrays.asList(config.getValue(Property.TSD_ENDPOINT_READ.getName(), Property.TSD_ENDPOINT_READ.getDefaultValue()).split(",")); @@ -164,7 +170,7 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer _writeEndpoints = config.getValue(Property.TSD_ENDPOINT_WRITE.getName(), Property.TSD_ENDPOINT_WRITE.getDefaultValue()).split(","); RETRY_COUNT = Integer.parseInt(config.getValue(Property.TSD_RETRY_COUNT.getName(), - Property.TSD_RETRY_COUNT.getDefaultValue())); + Property.TSD_RETRY_COUNT.getDefaultValue())); for(String writeEndpoint : _writeEndpoints) { requireArgument((writeEndpoint != null) && (!writeEndpoint.isEmpty()), "Illegal write endpoint URL."); @@ -176,16 +182,16 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer try { int index = 0; for (String readEndpoint : _readEndPoints) { - _readPortMap.put(readEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,readEndpoint)); + _readPortMap.put(readEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount ,readEndpoint)); _readBackupEndPointsMap.put(readEndpoint, _readBackupEndPoints.get(index)); index ++; } for (String readBackupEndpoint : _readBackupEndPoints) { if (!readBackupEndpoint.isEmpty()) - _readPortMap.put(readBackupEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,readBackupEndpoint)); + _readPortMap.put(readBackupEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount, readBackupEndpoint)); } - - _writeHttpClient = getClient(connCount / 2, connTimeout, socketTimeout, _writeEndpoints); + + _writeHttpClient = getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount, _writeEndpoints); _roundRobinIterator = Iterables.cycle(_writeEndpoints).iterator(); _executorService = Executors.newFixedThreadPool(connCount); @@ -318,21 +324,21 @@ public void putMetrics(List metrics) { } } - public void _retry(List objects, Iterator endPointIterator) { - for(int i=0;i void _retry(List objects, Iterator endPointIterator) { + for(int i=0;i annotations) { put(wrappers, endpoint + "/api/annotation/bulk", HttpMethod.POST); } catch(IOException ex) { _logger.warn("IOException while trying to push annotations", ex); - _retry(wrappers, _roundRobinIterator); + _retry(wrappers, _roundRobinIterator); } } } @@ -363,7 +369,7 @@ private ObjectMapper getMapper() { mapper.registerModule(module); return mapper; } - + /* Writes objects in chunks. */ private void put(List objects, String endpoint, HttpMethod method) throws IOException { if (objects != null) { @@ -386,7 +392,7 @@ private void put(List objects, String endpoint, HttpMethod method) throws } /* Helper to create the read and write clients. */ - protected CloseableHttpClient getClient(int connCount, int connTimeout, int socketTimeout, String...endpoints) throws MalformedURLException { + protected CloseableHttpClient getClient(int connCount, int connTimeout, int socketTimeout, int connectionReuseCount, String...endpoints) throws MalformedURLException { PoolingHttpClientConnectionManager connMgr = new PoolingHttpClientConnectionManager(); connMgr.setMaxTotal(connCount); @@ -400,8 +406,11 @@ protected CloseableHttpClient getClient(int connCount, int connTimeout, int sock RequestConfig reqConfig = RequestConfig.custom().setConnectionRequestTimeout(connTimeout).setConnectTimeout(connTimeout).setSocketTimeout( socketTimeout).build(); - - return HttpClients.custom().setConnectionManager(connMgr).setDefaultRequestConfig(reqConfig).build(); + if(connectionReuseCount>0) { + return HttpClients.custom().setConnectionManager(connMgr).setConnectionReuseStrategy(new TSDBReadConnectionReuseStrategy(connectionReuseCount)).setDefaultRequestConfig(reqConfig).build(); + }else { + return HttpClients.custom().setConnectionManager(connMgr).setDefaultRequestConfig(reqConfig).build(); + } } /* Converts a list of annotations into a list of annotation wrappers for use in serialization. Resulting list is sorted by target annotation @@ -491,7 +500,7 @@ protected T toEntity(String content, TypeReference type) { throw new SystemException(ex); } } - + /* Helper method to convert a Java entity to a JSON string. */ protected String fromEntity(T type) { try { @@ -523,7 +532,7 @@ protected String extractResponse(HttpResponse response) { /* Execute a request given by type requestType. */ protected HttpResponse executeHttpRequest(HttpMethod requestType, String url, CloseableHttpClient client, StringEntity entity) throws IOException { - + HttpResponse httpResponse = null; if (entity != null) { @@ -623,7 +632,8 @@ public enum Property { TSD_CONNECTION_COUNT("service.property.tsdb.connection.count", "2"), TSD_RETRY_COUNT("service.property.tsdb.retry.count", "3"), /** The TSDB backup read endpoint. */ - TSD_ENDPOINT_BACKUP_READ("service.property.tsdb.endpoint.backup.read", "http://localhost:4466,http://localhost:4467"); + TSD_ENDPOINT_BACKUP_READ("service.property.tsdb.endpoint.backup.read", "http://localhost:4466,http://localhost:4467"), + TSDB_READ_CONNECTION_REUSE_COUNT("service.property.tsdb.read.connection.reuse.count", "2000"); private final String _name; private final String _defaultValue; @@ -833,5 +843,28 @@ public Map> getMetrics(List queries) { public List getAnnotations(List queries) { throw new UnsupportedOperationException("This method should be overriden by a specific implementation."); } + /** + * Used to close http connections after reusing the same connection for certain number of times + * @author rsarkapally + * + */ + class TSDBReadConnectionReuseStrategy implements ConnectionReuseStrategy{ + int connectionReuseCount; + AtomicInteger numOfTimesReused = new AtomicInteger(1); + public TSDBReadConnectionReuseStrategy(int connectionReuseCount) { + this.connectionReuseCount=connectionReuseCount; + } + + @Override + public boolean keepAlive(HttpResponse response, HttpContext context) { + HttpClientContext httpContext = (HttpClientContext) context; + _logger.debug("http connection {} reused for {} times", httpContext.getConnection(), httpContext.getConnection().getMetrics().getRequestCount()); + if (numOfTimesReused.getAndIncrement() % connectionReuseCount == 0) { + numOfTimesReused.set(1); + return false; + } + return true; + } + } } /* Copyright (c) 2016, Salesforce.com, Inc. All rights reserved. */ diff --git a/ArgusWebServices/pom.xml b/ArgusWebServices/pom.xml index 5d32673ed..9dfddd7b3 100644 --- a/ArgusWebServices/pom.xml +++ b/ArgusWebServices/pom.xml @@ -212,7 +212,7 @@ io.jsonwebtoken jjwt - 0.7.0 + 0.9.0 org.glassfish.jersey.inject diff --git a/pom.xml b/pom.xml index f3c52544a..897f4a2e1 100644 --- a/pom.xml +++ b/pom.xml @@ -118,9 +118,9 @@ UTF-8 yyyy-MM-dd HH:mm salesforce - 2.12.0 + 2.14.0 1.2.3 - 2.9.2 + 2.9.5 4.5.3 4.12