Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Merge pull request #752 from rajsarkapally/tsdb_load_balancing
Browse files Browse the repository at this point in the history
TSDB read load balancing
  • Loading branch information
rajsarkapally authored Jun 5, 2018
2 parents 80b11cf + 3d2fc96 commit 7a8e44f
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 30 deletions.
2 changes: 1 addition & 1 deletion ArgusCore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4</version>
<version>4.4.9</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
Expand All @@ -65,11 +67,13 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -106,7 +110,7 @@ public class AbstractTSDBService extends DefaultService implements TSDBService {
//~ Instance fields ******************************************************************************************************************************
private final ObjectMapper _mapper;
protected final Logger _logger = LoggerFactory.getLogger(getClass());

private final String[] _writeEndpoints;
protected final CloseableHttpClient _writeHttpClient;

Expand Down Expand Up @@ -148,6 +152,8 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer
Property.TSD_ENDPOINT_CONNECTION_TIMEOUT.getDefaultValue()));
int socketTimeout = Integer.parseInt(config.getValue(Property.TSD_ENDPOINT_SOCKET_TIMEOUT.getName(),
Property.TSD_ENDPOINT_SOCKET_TIMEOUT.getDefaultValue()));
int tsdbConnectionReuseCount=Integer.parseInt(config.getValue(Property.TSDB_READ_CONNECTION_REUSE_COUNT.getName(),
Property.TSDB_READ_CONNECTION_REUSE_COUNT.getDefaultValue()));

_readEndPoints = Arrays.asList(config.getValue(Property.TSD_ENDPOINT_READ.getName(), Property.TSD_ENDPOINT_READ.getDefaultValue()).split(","));

Expand All @@ -164,7 +170,7 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer

_writeEndpoints = config.getValue(Property.TSD_ENDPOINT_WRITE.getName(), Property.TSD_ENDPOINT_WRITE.getDefaultValue()).split(",");
RETRY_COUNT = Integer.parseInt(config.getValue(Property.TSD_RETRY_COUNT.getName(),
Property.TSD_RETRY_COUNT.getDefaultValue()));
Property.TSD_RETRY_COUNT.getDefaultValue()));

for(String writeEndpoint : _writeEndpoints) {
requireArgument((writeEndpoint != null) && (!writeEndpoint.isEmpty()), "Illegal write endpoint URL.");
Expand All @@ -176,16 +182,16 @@ public AbstractTSDBService(SystemConfiguration config, MonitorService monitorSer
try {
int index = 0;
for (String readEndpoint : _readEndPoints) {
_readPortMap.put(readEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,readEndpoint));
_readPortMap.put(readEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount ,readEndpoint));
_readBackupEndPointsMap.put(readEndpoint, _readBackupEndPoints.get(index));
index ++;
}
for (String readBackupEndpoint : _readBackupEndPoints) {
if (!readBackupEndpoint.isEmpty())
_readPortMap.put(readBackupEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,readBackupEndpoint));
_readPortMap.put(readBackupEndpoint, getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount, readBackupEndpoint));
}
_writeHttpClient = getClient(connCount / 2, connTimeout, socketTimeout, _writeEndpoints);

_writeHttpClient = getClient(connCount / 2, connTimeout, socketTimeout,tsdbConnectionReuseCount, _writeEndpoints);

_roundRobinIterator = Iterables.cycle(_writeEndpoints).iterator();
_executorService = Executors.newFixedThreadPool(connCount);
Expand Down Expand Up @@ -318,21 +324,21 @@ public void putMetrics(List<Metric> metrics) {
}
}

public <T> void _retry(List<T> objects, Iterator<String> endPointIterator) {
for(int i=0;i<RETRY_COUNT;i++) {
try {
String endpoint=endPointIterator.next();
_logger.info("Retrying using endpoint {}.", endpoint);
put(objects, endpoint + "/api/put", HttpMethod.POST);
return;
} catch(IOException ex) {
_logger.warn("IOException while trying to push data. We will retry for {} more times",RETRY_COUNT-i);
}
}
_logger.error("Retried for {} times and we still failed. Dropping this chunk of data.", RETRY_COUNT);
}
public <T> void _retry(List<T> objects, Iterator<String> endPointIterator) {
for(int i=0;i<RETRY_COUNT;i++) {
try {
String endpoint=endPointIterator.next();
_logger.info("Retrying using endpoint {}.", endpoint);
put(objects, endpoint + "/api/put", HttpMethod.POST);
return;
} catch(IOException ex) {
_logger.warn("IOException while trying to push data. We will retry for {} more times",RETRY_COUNT-i);
}
}

_logger.error("Retried for {} times and we still failed. Dropping this chunk of data.", RETRY_COUNT);

}

/** @see TSDBService#putAnnotations(java.util.List) */
@Override
Expand All @@ -346,7 +352,7 @@ public void putAnnotations(List<Annotation> annotations) {
put(wrappers, endpoint + "/api/annotation/bulk", HttpMethod.POST);
} catch(IOException ex) {
_logger.warn("IOException while trying to push annotations", ex);
_retry(wrappers, _roundRobinIterator);
_retry(wrappers, _roundRobinIterator);
}
}
}
Expand All @@ -363,7 +369,7 @@ private ObjectMapper getMapper() {
mapper.registerModule(module);
return mapper;
}

/* Writes objects in chunks. */
private <T> void put(List<T> objects, String endpoint, HttpMethod method) throws IOException {
if (objects != null) {
Expand All @@ -386,7 +392,7 @@ private <T> void put(List<T> objects, String endpoint, HttpMethod method) throws
}

/* Helper to create the read and write clients. */
protected CloseableHttpClient getClient(int connCount, int connTimeout, int socketTimeout, String...endpoints) throws MalformedURLException {
protected CloseableHttpClient getClient(int connCount, int connTimeout, int socketTimeout, int connectionReuseCount, String...endpoints) throws MalformedURLException {
PoolingHttpClientConnectionManager connMgr = new PoolingHttpClientConnectionManager();
connMgr.setMaxTotal(connCount);

Expand All @@ -400,8 +406,11 @@ protected CloseableHttpClient getClient(int connCount, int connTimeout, int sock

RequestConfig reqConfig = RequestConfig.custom().setConnectionRequestTimeout(connTimeout).setConnectTimeout(connTimeout).setSocketTimeout(
socketTimeout).build();

return HttpClients.custom().setConnectionManager(connMgr).setDefaultRequestConfig(reqConfig).build();
if(connectionReuseCount>0) {
return HttpClients.custom().setConnectionManager(connMgr).setConnectionReuseStrategy(new TSDBReadConnectionReuseStrategy(connectionReuseCount)).setDefaultRequestConfig(reqConfig).build();
}else {
return HttpClients.custom().setConnectionManager(connMgr).setDefaultRequestConfig(reqConfig).build();
}
}

/* Converts a list of annotations into a list of annotation wrappers for use in serialization. Resulting list is sorted by target annotation
Expand Down Expand Up @@ -491,7 +500,7 @@ protected <T> T toEntity(String content, TypeReference<T> type) {
throw new SystemException(ex);
}
}

/* Helper method to convert a Java entity to a JSON string. */
protected <T> String fromEntity(T type) {
try {
Expand Down Expand Up @@ -523,7 +532,7 @@ protected String extractResponse(HttpResponse response) {

/* Execute a request given by type requestType. */
protected HttpResponse executeHttpRequest(HttpMethod requestType, String url, CloseableHttpClient client, StringEntity entity) throws IOException {

HttpResponse httpResponse = null;

if (entity != null) {
Expand Down Expand Up @@ -623,7 +632,8 @@ public enum Property {
TSD_CONNECTION_COUNT("service.property.tsdb.connection.count", "2"),
TSD_RETRY_COUNT("service.property.tsdb.retry.count", "3"),
/** The TSDB backup read endpoint. */
TSD_ENDPOINT_BACKUP_READ("service.property.tsdb.endpoint.backup.read", "http://localhost:4466,http://localhost:4467");
TSD_ENDPOINT_BACKUP_READ("service.property.tsdb.endpoint.backup.read", "http://localhost:4466,http://localhost:4467"),
TSDB_READ_CONNECTION_REUSE_COUNT("service.property.tsdb.read.connection.reuse.count", "2000");

private final String _name;
private final String _defaultValue;
Expand Down Expand Up @@ -833,5 +843,28 @@ public Map<MetricQuery, List<Metric>> getMetrics(List<MetricQuery> queries) {
public List<Annotation> getAnnotations(List<AnnotationQuery> queries) {
throw new UnsupportedOperationException("This method should be overriden by a specific implementation.");
}
/**
* Used to close http connections after reusing the same connection for certain number of times
* @author rsarkapally
*
*/
class TSDBReadConnectionReuseStrategy implements ConnectionReuseStrategy{
int connectionReuseCount;
AtomicInteger numOfTimesReused = new AtomicInteger(1);
public TSDBReadConnectionReuseStrategy(int connectionReuseCount) {
this.connectionReuseCount=connectionReuseCount;
}

@Override
public boolean keepAlive(HttpResponse response, HttpContext context) {
HttpClientContext httpContext = (HttpClientContext) context;
_logger.debug("http connection {} reused for {} times", httpContext.getConnection(), httpContext.getConnection().getMetrics().getRequestCount());
if (numOfTimesReused.getAndIncrement() % connectionReuseCount == 0) {
numOfTimesReused.set(1);
return false;
}
return true;
}
}
}
/* Copyright (c) 2016, Salesforce.com, Inc. All rights reserved. */

0 comments on commit 7a8e44f

Please sign in to comment.