Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Merge branch 'es_changes' of github.com:dilipdevaraj-sfdc/Argus-1 int…
Browse files Browse the repository at this point in the history
…o es_changes
  • Loading branch information
dilipdevaraj-sfdc committed Jun 7, 2018
2 parents d5c1c80 + a3e6e02 commit 7639453
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 39 deletions.
14 changes: 7 additions & 7 deletions ArgusCore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4</version>
<version>4.4.9</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand All @@ -202,7 +202,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependency>
<dependency>
<groupId>org.eclipse.persistence</groupId>
<artifactId>eclipselink</artifactId>
Expand Down Expand Up @@ -262,7 +262,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
Expand All @@ -283,7 +283,7 @@
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
<version>3.4.6</version>
<version>3.4.10</version>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
Expand Down Expand Up @@ -341,7 +341,7 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.98.8-hadoop2</version>
<version>1.4.2</version>
<exclusions>
<exclusion>
<artifactId>jersey-core</artifactId>
Expand Down Expand Up @@ -390,7 +390,7 @@
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.9.0-HBase-0.98</version>
<version>4.13.1-HBase-0.98</version>
<exclusions>
<exclusion>
<artifactId>jersey-core</artifactId>
Expand Down Expand Up @@ -422,7 +422,7 @@
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.10.0</version>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
Expand All @@ -65,11 +67,13 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -106,7 +110,7 @@ public class AbstractTSDBService extends DefaultService implements TSDBService {
//~ Instance fields ******************************************************************************************************************************
private final ObjectMapper _mapper;
protected final Logger _logger = LoggerFactory.getLogger(getClass());

private final String[] _writeEndpoints;
protected final CloseableHttpClient _writeHttpClient;

Expand Down Expand Up @@ -148,6 +152,8 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer
Property.TSD_ENDPOINT_CONNECTION_TIMEOUT.getDefaultValue()));
int socketTimeout = Integer.parseInt(config.getValue(Property.TSD_ENDPOINT_SOCKET_TIMEOUT.getName(),
Property.TSD_ENDPOINT_SOCKET_TIMEOUT.getDefaultValue()));
int tsdbConnectionReuseCount=Integer.parseInt(config.getValue(Property.TSDB_READ_CONNECTION_REUSE_COUNT.getName(),
Property.TSDB_READ_CONNECTION_REUSE_COUNT.getDefaultValue()));

_readEndPoints = Arrays.asList(config.getValue(Property.TSD_ENDPOINT_READ.getName(), Property.TSD_ENDPOINT_READ.getDefaultValue()).split(","));

Expand All @@ -164,7 +170,7 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer

_writeEndpoints = config.getValue(Property.TSD_ENDPOINT_WRITE.getName(), Property.TSD_ENDPOINT_WRITE.getDefaultValue()).split(",");
RETRY_COUNT = Integer.parseInt(config.getValue(Property.TSD_RETRY_COUNT.getName(),
Property.TSD_RETRY_COUNT.getDefaultValue()));
Property.TSD_RETRY_COUNT.getDefaultValue()));

for(String writeEndpoint : _writeEndpoints) {
requireArgument((writeEndpoint != null) && (!writeEndpoint.isEmpty()), "Illegal write endpoint URL.");
Expand All @@ -176,16 +182,16 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer
try {
int index = 0;
for (String readEndpoint : _readEndPoints) {
_readPortMap.put(readEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,readEndpoint));
_readPortMap.put(readEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount ,readEndpoint));
_readBackupEndPointsMap.put(readEndpoint, _readBackupEndPoints.get(index));
index ++;
}
for (String readBackupEndpoint : _readBackupEndPoints) {
if (!readBackupEndpoint.isEmpty())
_readPortMap.put(readBackupEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,readBackupEndpoint));
_readPortMap.put(readBackupEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount, readBackupEndpoint));
}
_writeHttpClient = getClient(connCount / 2, connTimeout, socketTimeout, _writeEndpoints);

_writeHttpClient = getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount, _writeEndpoints);

_roundRobinIterator = Iterables.cycle(_writeEndpoints).iterator();
_executorService = Executors.newFixedThreadPool(connCount);
Expand Down Expand Up @@ -318,21 +324,21 @@ public void putMetrics(List<Metric> metrics) {
}
}

public <T> void _retry(List<T> objects, Iterator<String> endPointIterator) {
for(int i=0;i<RETRY_COUNT;i++) {
try {
String endpoint=endPointIterator.next();
_logger.info("Retrying using endpoint {}.", endpoint);
put(objects, endpoint + "/api/put", HttpMethod.POST);
return;
} catch(IOException ex) {
_logger.warn("IOException while trying to push data. We will retry for {} more times",RETRY_COUNT-i);
}
}
_logger.error("Retried for {} times and we still failed. Dropping this chunk of data.", RETRY_COUNT);
}
public <T> void _retry(List<T> objects, Iterator<String> endPointIterator) {
for(int i=0;i<RETRY_COUNT;i++) {
try {
String endpoint=endPointIterator.next();
_logger.info("Retrying using endpoint {}.", endpoint);
put(objects, endpoint + "/api/put", HttpMethod.POST);
return;
} catch(IOException ex) {
_logger.warn("IOException while trying to push data. We will retry for {} more times",RETRY_COUNT-i);
}
}

_logger.error("Retried for {} times and we still failed. Dropping this chunk of data.", RETRY_COUNT);

}

/** @see TSDBService#putAnnotations(java.util.List) */
@Override
Expand All @@ -346,7 +352,7 @@ public void putAnnotations(List<Annotation> annotations) {
put(wrappers, endpoint + "/api/annotation/bulk", HttpMethod.POST);
} catch(IOException ex) {
_logger.warn("IOException while trying to push annotations", ex);
_retry(wrappers, _roundRobinIterator);
_retry(wrappers, _roundRobinIterator);
}
}
}
Expand All @@ -363,7 +369,7 @@ private ObjectMapper getMapper() {
mapper.registerModule(module);
return mapper;
}

/* Writes objects in chunks. */
private <T> void put(List<T> objects, String endpoint, HttpMethod method) throws IOException {
if (objects != null) {
Expand All @@ -386,7 +392,7 @@ private <T> void put(List<T> objects, String endpoint, HttpMethod method) throws
}

/* Helper to create the read and write clients. */
protected CloseableHttpClient getClient(int connCount, int connTimeout, int socketTimeout, String...endpoints) throws MalformedURLException {
protected CloseableHttpClient getClient(int connCount, int connTimeout, int socketTimeout, int connectionReuseCount, String...endpoints) throws MalformedURLException {
PoolingHttpClientConnectionManager connMgr = new PoolingHttpClientConnectionManager();
connMgr.setMaxTotal(connCount);

Expand All @@ -400,8 +406,11 @@ protected CloseableHttpClient getClient(int connCount, int connTimeout, int sock

RequestConfig reqConfig = RequestConfig.custom().setConnectionRequestTimeout(connTimeout).setConnectTimeout(connTimeout).setSocketTimeout(
socketTimeout).build();

return HttpClients.custom().setConnectionManager(connMgr).setDefaultRequestConfig(reqConfig).build();
if(connectionReuseCount>0) {
return HttpClients.custom().setConnectionManager(connMgr).setConnectionReuseStrategy(new TSDBReadConnectionReuseStrategy(connectionReuseCount)).setDefaultRequestConfig(reqConfig).build();
}else {
return HttpClients.custom().setConnectionManager(connMgr).setDefaultRequestConfig(reqConfig).build();
}
}

/* Converts a list of annotations into a list of annotation wrappers for use in serialization. Resulting list is sorted by target annotation
Expand Down Expand Up @@ -491,7 +500,7 @@ protected <T> T toEntity(String content, TypeReference<T> type) {
throw new SystemException(ex);
}
}

/* Helper method to convert a Java entity to a JSON string. */
protected <T> String fromEntity(T type) {
try {
Expand Down Expand Up @@ -523,7 +532,7 @@ protected String extractResponse(HttpResponse response) {

/* Execute a request given by type requestType. */
protected HttpResponse executeHttpRequest(HttpMethod requestType, String url, CloseableHttpClient client, StringEntity entity) throws IOException {

HttpResponse httpResponse = null;

if (entity != null) {
Expand Down Expand Up @@ -623,7 +632,8 @@ public enum Property {
TSD_CONNECTION_COUNT("service.property.tsdb.connection.count", "2"),
TSD_RETRY_COUNT("service.property.tsdb.retry.count", "3"),
/** The TSDB backup read endpoint. */
TSD_ENDPOINT_BACKUP_READ("service.property.tsdb.endpoint.backup.read", "http://localhost:4466,http://localhost:4467");
TSD_ENDPOINT_BACKUP_READ("service.property.tsdb.endpoint.backup.read", "http://localhost:4466,http://localhost:4467"),
TSDB_READ_CONNECTION_REUSE_COUNT("service.property.tsdb.read.connection.reuse.count", "2000");

private final String _name;
private final String _defaultValue;
Expand Down Expand Up @@ -833,5 +843,28 @@ public Map<MetricQuery, List<Metric>> getMetrics(List<MetricQuery> queries) {
public List<Annotation> getAnnotations(List<AnnotationQuery> queries) {
throw new UnsupportedOperationException("This method should be overriden by a specific implementation.");
}
/**
* Used to close http connections after reusing the same connection for certain number of times
* @author rsarkapally
*
*/
class TSDBReadConnectionReuseStrategy implements ConnectionReuseStrategy{
int connectionReuseCount;
AtomicInteger numOfTimesReused = new AtomicInteger(1);
public TSDBReadConnectionReuseStrategy(int connectionReuseCount) {
this.connectionReuseCount=connectionReuseCount;
}

@Override
public boolean keepAlive(HttpResponse response, HttpContext context) {
HttpClientContext httpContext = (HttpClientContext) context;
_logger.debug("http connection {} reused for {} times", httpContext.getConnection(), httpContext.getConnection().getMetrics().getRequestCount());
if (numOfTimesReused.getAndIncrement() % connectionReuseCount == 0) {
numOfTimesReused.set(1);
return false;
}
return true;
}
}
}
/* Copyright (c) 2016, Salesforce.com, Inc. All rights reserved. */
2 changes: 1 addition & 1 deletion ArgusWebServices/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt</artifactId>
<version>0.7.0</version>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format>
<dockerImages.base.name>salesforce</dockerImages.base.name>
<project.version>2.12.0</project.version>
<project.version>2.14.0</project.version>
<logback.version>1.2.3</logback.version>
<jackson.version>2.9.2</jackson.version>
<jackson.version>2.9.5</jackson.version>
<httpclient.version>4.5.3</httpclient.version>
<junit.version>4.12</junit.version>
</properties>
Expand Down

0 comments on commit 7639453

Please sign in to comment.