diff --git a/ArgusCore/pom.xml b/ArgusCore/pom.xml
index 9fe5bb767..3bba585d6 100644
--- a/ArgusCore/pom.xml
+++ b/ArgusCore/pom.xml
@@ -189,7 +189,7 @@
org.apache.httpcomponents
httpcore
- 4.4
+ 4.4.9
com.fasterxml.jackson.core
diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java
index 5f8135662..cc0478aa6 100644
--- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java
+++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/tsdb/AbstractTSDBService.java
@@ -54,7 +54,9 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
@@ -65,11 +67,13 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,7 +110,7 @@ public class AbstractTSDBService extends DefaultService implements TSDBService {
//~ Instance fields ******************************************************************************************************************************
private final ObjectMapper _mapper;
protected final Logger _logger = LoggerFactory.getLogger(getClass());
-
+
private final String[] _writeEndpoints;
protected final CloseableHttpClient _writeHttpClient;
@@ -148,6 +152,8 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer
Property.TSD_ENDPOINT_CONNECTION_TIMEOUT.getDefaultValue()));
int socketTimeout = Integer.parseInt(config.getValue(Property.TSD_ENDPOINT_SOCKET_TIMEOUT.getName(),
Property.TSD_ENDPOINT_SOCKET_TIMEOUT.getDefaultValue()));
+ int tsdbConnectionReuseCount=Integer.parseInt(config.getValue(Property.TSDB_READ_CONNECTION_REUSE_COUNT.getName(),
+ Property.TSDB_READ_CONNECTION_REUSE_COUNT.getDefaultValue()));
_readEndPoints = Arrays.asList(config.getValue(Property.TSD_ENDPOINT_READ.getName(), Property.TSD_ENDPOINT_READ.getDefaultValue()).split(","));
@@ -164,7 +170,7 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer
_writeEndpoints = config.getValue(Property.TSD_ENDPOINT_WRITE.getName(), Property.TSD_ENDPOINT_WRITE.getDefaultValue()).split(",");
RETRY_COUNT = Integer.parseInt(config.getValue(Property.TSD_RETRY_COUNT.getName(),
- Property.TSD_RETRY_COUNT.getDefaultValue()));
+ Property.TSD_RETRY_COUNT.getDefaultValue()));
for(String writeEndpoint : _writeEndpoints) {
requireArgument((writeEndpoint != null) && (!writeEndpoint.isEmpty()), "Illegal write endpoint URL.");
@@ -176,16 +182,16 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer
try {
int index = 0;
for (String readEndpoint : _readEndPoints) {
- _readPortMap.put(readEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,readEndpoint));
+ _readPortMap.put(readEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount ,readEndpoint));
_readBackupEndPointsMap.put(readEndpoint, _readBackupEndPoints.get(index));
index ++;
}
for (String readBackupEndpoint : _readBackupEndPoints) {
if (!readBackupEndpoint.isEmpty())
- _readPortMap.put(readBackupEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,readBackupEndpoint));
+ _readPortMap.put(readBackupEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount, readBackupEndpoint));
}
-
- _writeHttpClient = getClient(connCount / 2, connTimeout, socketTimeout, _writeEndpoints);
+
+ _writeHttpClient = getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount, _writeEndpoints);
_roundRobinIterator = Iterables.cycle(_writeEndpoints).iterator();
_executorService = Executors.newFixedThreadPool(connCount);
@@ -318,21 +324,21 @@ public void putMetrics(List metrics) {
}
}
- public void _retry(List objects, Iterator endPointIterator) {
- for(int i=0;i void _retry(List objects, Iterator endPointIterator) {
+ for(int i=0;i annotations) {
put(wrappers, endpoint + "/api/annotation/bulk", HttpMethod.POST);
} catch(IOException ex) {
_logger.warn("IOException while trying to push annotations", ex);
- _retry(wrappers, _roundRobinIterator);
+ _retry(wrappers, _roundRobinIterator);
}
}
}
@@ -363,7 +369,7 @@ private ObjectMapper getMapper() {
mapper.registerModule(module);
return mapper;
}
-
+
/* Writes objects in chunks. */
private void put(List objects, String endpoint, HttpMethod method) throws IOException {
if (objects != null) {
@@ -386,7 +392,7 @@ private void put(List objects, String endpoint, HttpMethod method) throws
}
/* Helper to create the read and write clients. */
- protected CloseableHttpClient getClient(int connCount, int connTimeout, int socketTimeout, String...endpoints) throws MalformedURLException {
+ protected CloseableHttpClient getClient(int connCount, int connTimeout, int socketTimeout, int connectionReuseCount, String...endpoints) throws MalformedURLException {
PoolingHttpClientConnectionManager connMgr = new PoolingHttpClientConnectionManager();
connMgr.setMaxTotal(connCount);
@@ -400,8 +406,11 @@ protected CloseableHttpClient getClient(int connCount, int connTimeout, int sock
RequestConfig reqConfig = RequestConfig.custom().setConnectionRequestTimeout(connTimeout).setConnectTimeout(connTimeout).setSocketTimeout(
socketTimeout).build();
-
- return HttpClients.custom().setConnectionManager(connMgr).setDefaultRequestConfig(reqConfig).build();
+ if(connectionReuseCount>0) {
+ return HttpClients.custom().setConnectionManager(connMgr).setConnectionReuseStrategy(new TSDBReadConnectionReuseStrategy(connectionReuseCount)).setDefaultRequestConfig(reqConfig).build();
+ }else {
+ return HttpClients.custom().setConnectionManager(connMgr).setDefaultRequestConfig(reqConfig).build();
+ }
}
/* Converts a list of annotations into a list of annotation wrappers for use in serialization. Resulting list is sorted by target annotation
@@ -491,7 +500,7 @@ protected T toEntity(String content, TypeReference type) {
throw new SystemException(ex);
}
}
-
+
/* Helper method to convert a Java entity to a JSON string. */
protected String fromEntity(T type) {
try {
@@ -523,7 +532,7 @@ protected String extractResponse(HttpResponse response) {
/* Execute a request given by type requestType. */
protected HttpResponse executeHttpRequest(HttpMethod requestType, String url, CloseableHttpClient client, StringEntity entity) throws IOException {
-
+
HttpResponse httpResponse = null;
if (entity != null) {
@@ -623,7 +632,8 @@ public enum Property {
TSD_CONNECTION_COUNT("service.property.tsdb.connection.count", "2"),
TSD_RETRY_COUNT("service.property.tsdb.retry.count", "3"),
/** The TSDB backup read endpoint. */
- TSD_ENDPOINT_BACKUP_READ("service.property.tsdb.endpoint.backup.read", "http://localhost:4466,http://localhost:4467");
+ TSD_ENDPOINT_BACKUP_READ("service.property.tsdb.endpoint.backup.read", "http://localhost:4466,http://localhost:4467"),
+ TSDB_READ_CONNECTION_REUSE_COUNT("service.property.tsdb.read.connection.reuse.count", "2000");
private final String _name;
private final String _defaultValue;
@@ -833,5 +843,28 @@ public Map> getMetrics(List queries) {
public List getAnnotations(List queries) {
throw new UnsupportedOperationException("This method should be overriden by a specific implementation.");
}
+ /**
+ * Used to close http connections after reusing the same connection for certain number of times
+ * @author rsarkapally
+ *
+ */
+ class TSDBReadConnectionReuseStrategy implements ConnectionReuseStrategy{
+ int connectionReuseCount;
+ AtomicInteger numOfTimesReused = new AtomicInteger(1);
+ public TSDBReadConnectionReuseStrategy(int connectionReuseCount) {
+ this.connectionReuseCount=connectionReuseCount;
+ }
+
+ @Override
+ public boolean keepAlive(HttpResponse response, HttpContext context) {
+ HttpClientContext httpContext = (HttpClientContext) context;
+ _logger.debug("http connection {} reused for {} times", httpContext.getConnection(), httpContext.getConnection().getMetrics().getRequestCount());
+ if (numOfTimesReused.getAndIncrement() % connectionReuseCount == 0) {
+ numOfTimesReused.set(1);
+ return false;
+ }
+ return true;
+ }
+ }
}
/* Copyright (c) 2016, Salesforce.com, Inc. All rights reserved. */