diff --git a/ArgusCore/pom.xml b/ArgusCore/pom.xml index 3bba585d6..9c9d0dc6d 100644 --- a/ArgusCore/pom.xml +++ b/ArgusCore/pom.xml @@ -309,7 +309,7 @@ com.google.guava guava - 19.0 + 23.0 org.apache.commons diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index c6d29387a..ffb87f857 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -1,21 +1,23 @@ package com.salesforce.dva.argus.service.schema; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryPoolMXBean; -import java.lang.management.MemoryType; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; +import java.util.Calendar; import java.util.List; import java.util.Map.Entry; import java.util.Properties; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree; -import com.googlecode.concurrenttrees.radix.RadixTree; -import com.googlecode.concurrenttrees.radix.node.concrete.SmartArrayBasedNodeFactory; -import com.googlecode.concurrenttrees.radix.node.concrete.voidvalue.VoidValue; +import com.google.common.hash.BloomFilter; +import com.google.common.hash.Funnels; + import com.salesforce.dva.argus.entity.KeywordQuery; import com.salesforce.dva.argus.entity.Metric; import com.salesforce.dva.argus.entity.MetricSchemaRecord; @@ -26,245 +28,257 @@ import com.salesforce.dva.argus.system.SystemConfiguration; public abstract class AbstractSchemaService extends DefaultService implements SchemaService { - - private static final long MAX_MEMORY = Runtime.getRuntime().maxMemory(); - private static final long POLL_INTERVAL_MS = 60 * 1000L; - protected static final RadixTree TRIE = new ConcurrentRadixTree<>(new SmartArrayBasedNodeFactory()); - - private static boolean _writesToTrieEnabled = true; - - private final Logger _logger = LoggerFactory.getLogger(getClass()); - private final Thread _oldGenMonitorThread; - private final boolean _cacheEnabled; - protected final boolean _syncPut; + private static final long POLL_INTERVAL_MS = 10 * 60 * 1000L; + private static final int DAY_IN_SECONDS = 24 * 60 * 60; + private static final int HOUR_IN_SECONDS = 60 * 60; + protected static BloomFilter bloomFilter; + private Random rand = new Random(); + private int randomNumber = rand.nextInt(); + private int bloomFilterExpectedNumberInsertions; + private double bloomFilterErrorRate; + private final Logger _logger = LoggerFactory.getLogger(getClass()); + private final Thread _bloomFilterMonitorThread; + protected final boolean _syncPut; + private int bloomFilterFlushHourToStartAt; + private ScheduledExecutorService scheduledExecutorService; protected AbstractSchemaService(SystemConfiguration config) { super(config); - - _cacheEnabled = Boolean.parseBoolean( - config.getValue(Property.CACHE_SCHEMARECORDS.getName(), Property.CACHE_SCHEMARECORDS.getDefaultValue())); - _syncPut = Boolean.parseBoolean( - config.getValue(Property.SYNC_PUT.getName(), Property.SYNC_PUT.getDefaultValue())); - - _oldGenMonitorThread = new Thread(new OldGenMonitorThread(), "old-gen-monitor"); - if(_cacheEnabled) { - _oldGenMonitorThread.start(); - } + + bloomFilterExpectedNumberInsertions = Integer.parseInt(config.getValue(Property.BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS.getName(), + Property.BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS.getDefaultValue())); + bloomFilterErrorRate = Double.parseDouble(config.getValue(Property.BLOOMFILTER_ERROR_RATE.getName(), + Property.BLOOMFILTER_ERROR_RATE.getDefaultValue())); + bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), bloomFilterExpectedNumberInsertions , bloomFilterErrorRate); + + _syncPut = Boolean.parseBoolean( + config.getValue(Property.SYNC_PUT.getName(), Property.SYNC_PUT.getDefaultValue())); + + _bloomFilterMonitorThread = new Thread(new BloomFilterMonitorThread(), "bloom-filter-monitor"); + _bloomFilterMonitorThread.start(); + + bloomFilterFlushHourToStartAt = Integer.parseInt(config.getValue(Property.BLOOM_FILTER_FLUSH_HOUR_TO_START_AT.getName(), + Property.BLOOM_FILTER_FLUSH_HOUR_TO_START_AT.getDefaultValue())); + createScheduledExecutorService(bloomFilterFlushHourToStartAt); } @Override public void put(Metric metric) { requireNotDisposed(); SystemAssert.requireArgument(metric != null, "Metric cannot be null."); - + put(Arrays.asList(metric)); } @Override public void put(List metrics) { requireNotDisposed(); - SystemAssert.requireArgument(metrics != null, "Metric list cannot be null."); - - //If cache is not enabled, call implementation specific put with the list of metrics. - if(!_cacheEnabled) { - implementationSpecificPut(metrics); - return; - } - - //If cache is enabled, create a list of metricsToPut that do not exist on the TRIE and then call implementation - // specific put with only those subset of metricsToPut. - List metricsToPut = new ArrayList<>(metrics.size()); - + SystemAssert.requireArgument(metrics != null, "Metric list cannot be null."); + + // Create a list of metricsToPut that do not exist on the BLOOMFILTER and then call implementation + // specific put with only those subset of metricsToPut. + List metricsToPut = new ArrayList<>(metrics.size()); + for(Metric metric : metrics) { if(metric.getTags().isEmpty()) { - String key = constructTrieKey(metric, null); - boolean found = TRIE.getValueForExactKey(key) != null; - if(!found) { - metricsToPut.add(metric); - if(_writesToTrieEnabled) { - TRIE.putIfAbsent(key, VoidValue.SINGLETON); - } - } + String key = constructKey(metric, null); + boolean found = bloomFilter.mightContain(key); + if(!found) { + metricsToPut.add(metric); + } } else { boolean newTags = false; for(Entry tagEntry : metric.getTags().entrySet()) { - String key = constructTrieKey(metric, tagEntry); - boolean found = TRIE.getValueForExactKey(key) != null; - if(!found) { - newTags = true; - if(_writesToTrieEnabled) { - TRIE.putIfAbsent(key, VoidValue.SINGLETON); - } - } + String key = constructKey(metric, tagEntry); + boolean found = bloomFilter.mightContain(key); + if(!found) { + newTags = true; + } } - + if(newTags) { metricsToPut.add(metric); } } } - + implementationSpecificPut(metricsToPut); } - - + protected abstract void implementationSpecificPut(List metrics); - protected String constructTrieKey(Metric metric, Entry tagEntry) { + @Override + public void dispose() { + requireNotDisposed(); + if (_bloomFilterMonitorThread != null && _bloomFilterMonitorThread.isAlive()) { + _logger.info("Stopping bloom filter monitor thread."); + _bloomFilterMonitorThread.interrupt(); + _logger.info("Bloom filter monitor thread interrupted."); + try { + _logger.info("Waiting for bloom filter monitor thread to terminate."); + _bloomFilterMonitorThread.join(); + } catch (InterruptedException ex) { + _logger.warn("Bloom filter monitor thread was interrupted while shutting down."); + } + _logger.info("System monitoring stopped."); + } else { + _logger.info("Requested shutdown of bloom filter monitor thread aborted, as it is not yet running."); + } + shutdownScheduledExecutorService(); + } + + @Override + public abstract Properties getServiceProperties(); + + @Override + public abstract List get(MetricSchemaRecordQuery query); + + @Override + public abstract List getUnique(MetricSchemaRecordQuery query, RecordType type); + + @Override + public abstract List keywordSearch(KeywordQuery query); + + protected String constructKey(Metric metric, Entry tagEntry) { StringBuilder sb = new StringBuilder(metric.getScope()); sb.append('\0').append(metric.getMetric()); - + if(metric.getNamespace() != null) { sb.append('\0').append(metric.getNamespace()); } - + if(tagEntry != null) { sb.append('\0').append(tagEntry.getKey()).append('\0').append(tagEntry.getValue()); } - + + // Add randomness for each instance of bloom filter running on different + // schema clients to reduce probability of false positives that metric schemas are not written to ES + sb.append('\0').append(randomNumber); + return sb.toString(); } - - protected String constructTrieKey(String scope, String metric, String tagk, String tagv, String namespace) { + + protected String constructKey(String scope, String metric, String tagk, String tagv, String namespace) { StringBuilder sb = new StringBuilder(scope); sb.append('\0').append(metric); - + if(namespace != null) { sb.append('\0').append(namespace); } - + if(tagk != null) { sb.append('\0').append(tagk); } - + if(tagv != null) { sb.append('\0').append(tagv); } - + + // Add randomness for each instance of bloom filter running on different + // schema clients to reduce probability of false positives that metric schemas are not written to ES + sb.append('\0').append(randomNumber); + return sb.toString(); } - - @Override - public void dispose() { - requireNotDisposed(); - if (_oldGenMonitorThread != null && _oldGenMonitorThread.isAlive()) { - _logger.info("Stopping old gen monitor thread."); - _oldGenMonitorThread.interrupt(); - _logger.info("Old gen monitor thread interrupted."); - try { - _logger.info("Waiting for old gen monitor thread to terminate."); - _oldGenMonitorThread.join(); - } catch (InterruptedException ex) { - _logger.warn("Old gen monitor thread was interrupted while shutting down."); - } - _logger.info("System monitoring stopped."); - } else { - _logger.info("Requested shutdown of old gen monitor thread aborted, as it is not yet running."); - } + private void createScheduledExecutorService(int targetHourToStartAt){ + scheduledExecutorService = Executors.newScheduledThreadPool(1); + int initialDelayInSeconds = getNumHoursUntilTargetHour(targetHourToStartAt) * HOUR_IN_SECONDS; + BloomFilterFlushThread bloomFilterFlushThread = new BloomFilterFlushThread(); + scheduledExecutorService.scheduleAtFixedRate(bloomFilterFlushThread, initialDelayInSeconds, DAY_IN_SECONDS, TimeUnit.SECONDS); } - @Override - public abstract Properties getServiceProperties(); - - @Override - public abstract List get(MetricSchemaRecordQuery query); + private void shutdownScheduledExecutorService(){ + _logger.info("Shutting down scheduled bloom filter flush executor service"); + scheduledExecutorService.shutdown(); + try { + scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException ex) { + _logger.warn("Shutdown of executor service was interrupted."); + Thread.currentThread().interrupt(); + } + } - @Override - public abstract List getUnique(MetricSchemaRecordQuery query, RecordType type); + protected int getNumHoursUntilTargetHour(int targetHour){ + _logger.info("Initialized bloom filter flushing out, at {} hour of day", targetHour); + Calendar calendar = Calendar.getInstance(); + int hour = calendar.get(Calendar.HOUR_OF_DAY); + return hour < targetHour ? (targetHour - hour) : (targetHour + 24 - hour); + } - @Override - public abstract List keywordSearch(KeywordQuery query); - - /** - * The set of implementation specific configuration properties. - * - * @author Bhinav Sura (bhinav.sura@salesforce.com) - */ - public enum Property { - - /* If set to true, schema records will be cached on writes. This helps to check if a schema records already exists, - * and if it does then do not rewrite. Provide more heap space when using this option. */ - CACHE_SCHEMARECORDS("service.property.schema.cache.schemarecords", "false"), - SYNC_PUT("service.property.schema.sync.put", "false"); - - private final String _name; - private final String _defaultValue; - - private Property(String name, String defaultValue) { - _name = name; - _defaultValue = defaultValue; - } - - /** - * Returns the property name. - * - * @return The property name. - */ - public String getName() { - return _name; - } - - /** - * Returns the default value for the property. - * - * @return The default value. - */ - public String getDefaultValue() { - return _defaultValue; - } - } - - - //~ Inner Classes ******************************************************************************************************************************** - - /** - * Old Generation monitoring thread. - * - * @author Bhinav Sura (bhinav.sura@salesforce.com) - */ - private class OldGenMonitorThread implements Runnable { + * The set of implementation specific configuration properties. + * + * @author Bhinav Sura (bhinav.sura@salesforce.com) + */ + public enum Property { + SYNC_PUT("service.property.schema.sync.put", "false"), + BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS("service.property.schema.bloomfilter.expected.number.insertions", "40"), + BLOOMFILTER_ERROR_RATE("service.property.schema.bloomfilter.error.rate", "0.00001"), + /* + * Have a different configured flush start hour for different machines to prevent thundering herd problem. + */ + BLOOM_FILTER_FLUSH_HOUR_TO_START_AT("service.property.schema.bloomfilter.flush.hour.to.start.at","2"); + + private final String _name; + private final String _defaultValue; + private Property(String name, String defaultValue) { + _name = name; + _defaultValue = defaultValue; + } + + /** + * Returns the property name. + * + * @return The property name. + */ + public String getName() { + return _name; + } + + /** + * Returns the default value for the property. + * + * @return The default value. + */ + public String getDefaultValue() { + return _defaultValue; + } + } + + + //~ Inner Classes ******************************************************************************************************************************** + + /** + * Bloom Filter monitoring thread. + * + * @author Dilip Devaraj (ddevaraj@salesforce.com) + */ + private class BloomFilterMonitorThread implements Runnable { @Override public void run() { + _logger.info("Initialized random number for bloom filter key = {}", randomNumber); while (!Thread.currentThread().isInterrupted()) { _sleepForPollPeriod(); if (!Thread.currentThread().isInterrupted()) { try { - _checkOldGenUsage(); + _checkBloomFilterUsage(); } catch (Exception ex) { - _logger.warn("Exception occurred while checking old generation usage.", ex); + _logger.warn("Exception occurred while checking bloom filter usage.", ex); } } } } - private void _checkOldGenUsage() { - List memoryPoolBeans = ManagementFactory.getMemoryPoolMXBeans(); - for (MemoryPoolMXBean bean : memoryPoolBeans) { - if (bean.getType() == MemoryType.HEAP) { - String name = bean.getName().toLowerCase(); - if (name.contains("old") || name.contains("tenured")) { - long oldGenUsed = bean.getUsage().getUsed(); - _logger.info("Old Gen Memory = {} bytes", oldGenUsed); - _logger.info("Max JVM Memory = {} bytes", MAX_MEMORY); - if (oldGenUsed > 0.90 * MAX_MEMORY) { - _logger.info("JVM heap memory usage has exceeded 90% of the allocated heap memory. Disabling writes to TRIE."); - _writesToTrieEnabled = false; - } else if(oldGenUsed < 0.50 * MAX_MEMORY && !_writesToTrieEnabled) { - _logger.info("JVM heap memory usage is below 50% of the allocated heap memory and writes to TRIE is disabled. " - + "Enabling writes to TRIE now."); - _writesToTrieEnabled = true; - } - } - } - } + private void _checkBloomFilterUsage() { + _logger.info("Bloom approx no. elements = {}", bloomFilter.approximateElementCount()); + _logger.info("Bloom expected error rate = {}", bloomFilter.expectedFpp()); } private void _sleepForPollPeriod() { try { - _logger.info("Sleeping for {}s before checking old gen usage.", POLL_INTERVAL_MS / 1000); + _logger.info("Sleeping for {}s before checking bloom filter statistics.", POLL_INTERVAL_MS / 1000); Thread.sleep(POLL_INTERVAL_MS); } catch (InterruptedException ex) { _logger.warn("AbstractSchemaService memory monitor thread was interrupted."); @@ -273,4 +287,21 @@ private void _sleepForPollPeriod() { } } + private class BloomFilterFlushThread implements Runnable { + @Override + public void run() { + try{ + _flushBloomFilter(); + } catch (Exception ex) { + _logger.warn("Exception occurred while flushing bloom filter.", ex); + } + } + + private void _flushBloomFilter() { + _logger.info("Flushing out bloom filter entries"); + bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), bloomFilterExpectedNumberInsertions , bloomFilterErrorRate); + /* Don't need explicit synchronization to prevent slowness majority of the time*/ + randomNumber = rand.nextInt(); + } + } } diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java index 3f0a1f650..f104dd37c 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java @@ -65,7 +65,7 @@ */ @Singleton public class ElasticSearchSchemaService extends AbstractSchemaService { - + private static final String INDEX_NAME = "metadata_index"; private static final String TYPE_NAME = "metadata_type"; private static final String KEEP_SCROLL_CONTEXT_OPEN_FOR = "1m"; @@ -73,23 +73,23 @@ public class ElasticSearchSchemaService extends AbstractSchemaService { private static final int MAX_RETRY_TIMEOUT = 300 * 1000; private static final String FIELD_TYPE_TEXT = "text"; private static final String FIELD_TYPE_DATE ="date"; - + private final ObjectMapper _mapper; - private Logger _logger = LoggerFactory.getLogger(getClass()); - private final MonitorService _monitorService; - private final RestClient _esRestClient; - private final int _replicationFactor; + private Logger _logger = LoggerFactory.getLogger(getClass()); + private final MonitorService _monitorService; + private final RestClient _esRestClient; + private final int _replicationFactor; private final int _numShards; private final int _bulkIndexingSize; private HashAlgorithm _idgenHashAlgo; - - @Inject + + @Inject public ElasticSearchSchemaService(SystemConfiguration config, MonitorService monitorService) { super(config); - + _monitorService = monitorService; _mapper = _createObjectMapper(); - + String algorithm = config.getValue(Property.ELASTICSEARCH_IDGEN_HASH_ALGO.getName(), Property.ELASTICSEARCH_IDGEN_HASH_ALGO.getDefaultValue()); try { _idgenHashAlgo = HashAlgorithm.fromString(algorithm); @@ -97,21 +97,21 @@ public ElasticSearchSchemaService(SystemConfiguration config, MonitorService mon _logger.warn("{} is not supported by this service. Valid values are: {}.", algorithm, Arrays.asList(HashAlgorithm.values())); _idgenHashAlgo = HashAlgorithm.MD5; } - + _logger.info("Using {} for Elasticsearch document id generation.", _idgenHashAlgo); - + _replicationFactor = Integer.parseInt( config.getValue(Property.ELASTICSEARCH_NUM_REPLICAS.getName(), Property.ELASTICSEARCH_NUM_REPLICAS.getDefaultValue())); - + _numShards = Integer.parseInt( config.getValue(Property.ELASTICSEARCH_SHARDS_COUNT.getName(), Property.ELASTICSEARCH_SHARDS_COUNT.getDefaultValue())); - + _bulkIndexingSize = Integer.parseInt( config.getValue(Property.ELASTICSEARCH_INDEXING_BATCH_SIZE.getName(), Property.ELASTICSEARCH_INDEXING_BATCH_SIZE.getDefaultValue())); - + String[] nodes = config.getValue(Property.ELASTICSEARCH_ENDPOINT.getName(), Property.ELASTICSEARCH_ENDPOINT.getDefaultValue()).split(","); HttpHost[] httpHosts = new HttpHost[nodes.length]; - + for(int i=0; i metrics) { SystemAssert.requireArgument(metrics != null, "Metrics list cannot be null."); - + _logger.info("{} new metrics need to be indexed on ES.", metrics.size()); - + long start = System.currentTimeMillis(); List> fracturedList = _fracture(metrics); - + for(List records : fracturedList) { if(!records.isEmpty()) { if(_syncPut) { @@ -205,22 +205,22 @@ protected void implementationSpecificPut(List metrics) { } } } - + int count = 0; for(List records : fracturedList) { count += records.size(); } - + _monitorService.modifyCounter(MonitorService.Counter.SCHEMARECORDS_WRITTEN, count, null); _monitorService.modifyCounter(MonitorService.Counter.SCHEMARECORDS_WRITE_LATENCY, (System.currentTimeMillis() - start), null); } - + /* Convert the given list of metrics to a list of metric schema records. At the same time, fracture the records list * if its size is greater than INDEXING_BATCH_SIZE. */ - private List> _fracture(List metrics) { + protected List> _fracture(List metrics) { List> fracturedList = new ArrayList<>(); - + List records = new ArrayList<>(_bulkIndexingSize); for(Metric metric : metrics) { if(metric.getTags().isEmpty()) { @@ -233,41 +233,41 @@ private List> _fracture(List metrics) { } continue; } - + for(Map.Entry entry : metric.getTags().entrySet()) { records.add(new MetricSchemaRecord(metric.getNamespace(), metric.getScope(), metric.getMetric(), - entry.getKey(), entry.getValue())); + entry.getKey(), entry.getValue())); if(records.size() == _bulkIndexingSize) { fracturedList.add(records); records = new ArrayList<>(_bulkIndexingSize); } } } - + fracturedList.add(records); return fracturedList; } - + @Override public List get(MetricSchemaRecordQuery query) { requireNotDisposed(); - SystemAssert.requireArgument(query != null, "MetricSchemaRecordQuery cannot be null."); - long size = (long) query.getLimit() * query.getPage(); - SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, - "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE"); - - - Map tags = new HashMap<>(); - tags.put("type", "REGEXP_WITHOUT_AGGREGATION"); - long start = System.currentTimeMillis(); - boolean scroll = false; + SystemAssert.requireArgument(query != null, "MetricSchemaRecordQuery cannot be null."); + long size = (long) query.getLimit() * query.getPage(); + SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, + "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE"); + + + Map tags = new HashMap<>(); + tags.put("type", "REGEXP_WITHOUT_AGGREGATION"); + long start = System.currentTimeMillis(); + boolean scroll = false; StringBuilder sb = new StringBuilder().append("/") - .append(INDEX_NAME) - .append("/") - .append(TYPE_NAME) - .append("/") - .append("_search"); - + .append(INDEX_NAME) + .append("/") + .append(TYPE_NAME) + .append("/") + .append("_search"); + int from = 0, scrollSize; if(query.getLimit() * query.getPage() > 10000) { sb.append("?scroll=").append(KEEP_SCROLL_CONTEXT_OPEN_FOR); @@ -278,94 +278,94 @@ public List get(MetricSchemaRecordQuery query) { from = query.getLimit() * (query.getPage() - 1); scrollSize = query.getLimit(); } - + String requestUrl = sb.toString(); String queryJson = _constructTermQuery(query, from, scrollSize); - + try { Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(queryJson)); - + MetricSchemaRecordList list = toEntity(extractResponse(response), new TypeReference() {}); - + if(scroll) { requestUrl = new StringBuilder().append("/").append("_search").append("/").append("scroll").toString(); List records = new LinkedList<>(list.getRecords()); - + while(true) { String scrollID = list.getScrollID(); - + Map requestBody = new HashMap<>(); requestBody.put("scroll_id", scrollID); requestBody.put("scroll", KEEP_SCROLL_CONTEXT_OPEN_FOR); - + response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(new ObjectMapper().writeValueAsString(requestBody))); - + list = toEntity(extractResponse(response), new TypeReference() {}); records.addAll(list.getRecords()); - + if(records.size() >= query.getLimit() * query.getPage() || list.getRecords().size() < scrollSize) { break; } } - + int fromIndex = query.getLimit() * (query.getPage() - 1); if(records.size() <= fromIndex) { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return Collections.emptyList(); } - + _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return records.subList(fromIndex, records.size()); - + } else { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return list.getRecords(); } - + } catch (UnsupportedEncodingException | JsonProcessingException e) { throw new SystemException("Search failed.", e); } catch (IOException e) { throw new SystemException("IOException when trying to perform ES request.", e); } } - + @Override public List getUnique(MetricSchemaRecordQuery query, RecordType type) { requireNotDisposed(); SystemAssert.requireArgument(query != null, "MetricSchemaRecordQuery cannot be null."); long size = (long) query.getLimit() * query.getPage(); - SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, - "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE"); - - - Map tags = new HashMap<>(); - tags.put("type", "REGEXP_WITH_AGGREGATION"); - long start = System.currentTimeMillis(); + SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, + "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE"); + + + Map tags = new HashMap<>(); + tags.put("type", "REGEXP_WITH_AGGREGATION"); + long start = System.currentTimeMillis(); String requestUrl = new StringBuilder().append("/") - .append(INDEX_NAME) - .append("/") - .append(TYPE_NAME) - .append("/") - .append("_search") - .toString(); - + .append(INDEX_NAME) + .append("/") + .append(TYPE_NAME) + .append("/") + .append("_search") + .toString(); + String queryJson = _constructTermAggregationQuery(query, type); try { Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(queryJson)); String str = extractResponse(response); List records = SchemaService.constructMetricSchemaRecordsForType( - toEntity(str, new TypeReference>() {}), type); - + toEntity(str, new TypeReference>() {}), type); + int fromIndex = query.getLimit() * (query.getPage() - 1); if(records.size() <= fromIndex) { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return Collections.emptyList(); } - + if(records.size() < query.getLimit() * query.getPage()) { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); @@ -379,31 +379,31 @@ public List getUnique(MetricSchemaRecordQuery query, RecordT throw new SystemException(e); } } - + @Override public List keywordSearch(KeywordQuery kq) { requireNotDisposed(); - SystemAssert.requireArgument(kq != null, "Query cannot be null."); - SystemAssert.requireArgument(kq.getQuery() != null || kq.getType() != null, "Either the query string or the type must not be null."); - - long size = (long) kq.getLimit() * kq.getPage(); - SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, - "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE"); - - - Map tags = new HashMap<>(); - tags.put("type", "FTS_WITH_AGGREGATION"); - long start = System.currentTimeMillis(); + SystemAssert.requireArgument(kq != null, "Query cannot be null."); + SystemAssert.requireArgument(kq.getQuery() != null || kq.getType() != null, "Either the query string or the type must not be null."); + + long size = (long) kq.getLimit() * kq.getPage(); + SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, + "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE"); + + + Map tags = new HashMap<>(); + tags.put("type", "FTS_WITH_AGGREGATION"); + long start = System.currentTimeMillis(); StringBuilder sb = new StringBuilder().append("/") - .append(INDEX_NAME) - .append("/") - .append(TYPE_NAME) - .append("/") - .append("_search"); + .append(INDEX_NAME) + .append("/") + .append(TYPE_NAME) + .append("/") + .append("_search"); try { - + if(kq.getQuery() != null) { - + int from = 0, scrollSize = 0; boolean scroll = false;; if(kq.getLimit() * kq.getPage() > 10000) { @@ -415,82 +415,82 @@ public List keywordSearch(KeywordQuery kq) { from = kq.getLimit() * (kq.getPage() - 1); scrollSize = kq.getLimit(); } - + List tokens = _analyzedTokens(kq.getQuery()); String queryJson = _constructQueryStringQuery(tokens, from, scrollSize); String requestUrl = sb.toString(); - + Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(queryJson)); String strResponse = extractResponse(response); MetricSchemaRecordList list = toEntity(strResponse, new TypeReference() {}); - + if(scroll) { requestUrl = new StringBuilder().append("/").append("_search").append("/").append("scroll").toString(); List records = new LinkedList<>(list.getRecords()); - + while(true) { Map requestBody = new HashMap<>(); requestBody.put("scroll_id", list.getScrollID()); requestBody.put("scroll", KEEP_SCROLL_CONTEXT_OPEN_FOR); - + response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(new ObjectMapper().writeValueAsString(requestBody))); - + list = toEntity(extractResponse(response), new TypeReference() {}); - + records.addAll(list.getRecords()); - + if(records.size() >= kq.getLimit() * kq.getPage() || list.getRecords().size() < scrollSize) { break; } } - + int fromIndex = kq.getLimit() * (kq.getPage() - 1); if(records.size() <= fromIndex) { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return Collections.emptyList(); } - + _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return records.subList(fromIndex, records.size()); - + } else { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return list.getRecords(); } - - + + } else { Map> tokensMap = new HashMap<>(); - + List tokens = _analyzedTokens(kq.getScope()); if(!tokens.isEmpty()) { tokensMap.put(RecordType.SCOPE, tokens); } - + tokens = _analyzedTokens(kq.getMetric()); if(!tokens.isEmpty()) { tokensMap.put(RecordType.METRIC, tokens); } - + tokens = _analyzedTokens(kq.getTagKey()); if(!tokens.isEmpty()) { tokensMap.put(RecordType.TAGK, tokens); } - + tokens = _analyzedTokens(kq.getTagValue()); if(!tokens.isEmpty()) { tokensMap.put(RecordType.TAGV, tokens); } - + tokens = _analyzedTokens(kq.getNamespace()); if(!tokens.isEmpty()) { tokensMap.put(RecordType.NAMESPACE, tokens); } - + String queryJson = _constructQueryStringQuery(kq, tokensMap); String requestUrl = sb.toString(); Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(queryJson)); @@ -498,14 +498,14 @@ public List keywordSearch(KeywordQuery kq) { List records = SchemaService.constructMetricSchemaRecordsForType( toEntity(strResponse, new TypeReference>() {}), kq.getType()); - + int fromIndex = kq.getLimit() * (kq.getPage() - 1); if(records.size() <= fromIndex) { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return Collections.emptyList(); } - + if(records.size() < kq.getLimit() * kq.getPage()) { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); @@ -515,27 +515,27 @@ public List keywordSearch(KeywordQuery kq) { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return records.subList(fromIndex, kq.getLimit() * kq.getPage()); } - + } - + } catch (IOException e) { throw new SystemException(e); } } - - + + private List _analyzedTokens(String query) { - + if(!SchemaService.containsFilter(query)) { return Collections.emptyList(); } - + List tokens = new ArrayList<>(); - + String requestUrl = new StringBuilder("/").append(INDEX_NAME).append("/_analyze").toString(); - + String requestBody = "{\"analyzer\" : \"metadata_analyzer\", \"text\": \"" + query + "\" }"; - + try { Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(requestBody)); String strResponse = extractResponse(response); @@ -545,7 +545,7 @@ private List _analyzedTokens(String query) { tokens.add(tokenNode.get("token").asText()); } } - + return tokens; } catch (IOException e) { throw new SystemException(e); @@ -554,17 +554,17 @@ private List _analyzedTokens(String query) { private void _upsert(List records) { - + String requestUrl = new StringBuilder().append("/") - .append(INDEX_NAME) - .append("/") - .append(TYPE_NAME) - .append("/") - .append("_bulk") - .toString(); - + .append(INDEX_NAME) + .append("/") + .append(TYPE_NAME) + .append("/") + .append("_bulk") + .toString(); + String strResponse = ""; - + MetricSchemaRecordList msrList = new MetricSchemaRecordList(records, _idgenHashAlgo); try { String requestBody = _mapper.writeValueAsString(msrList); @@ -572,10 +572,9 @@ private void _upsert(List records) { strResponse = extractResponse(response); } catch (IOException e) { //TODO: Retry with exponential back-off for handling EsRejectedExecutionException/RemoteTransportException/TimeoutException?? - _removeFromTrie(records); throw new SystemException(e); } - + try { PutResponse putResponse = new ObjectMapper().readValue(strResponse, PutResponse.class); //TODO: If response contains HTTP 429 Too Many Requests (EsRejectedExecutionException), then retry with exponential back-off. @@ -586,42 +585,45 @@ private void _upsert(List records) { _logger.warn("Failed to index metric. Reason: " + new ObjectMapper().writeValueAsString(item.create.error)); recordsToRemove.add(msrList.getRecord(item.create._id)); } - + if(item.index != null && item.index.status == HttpStatus.SC_NOT_FOUND) { _logger.warn("Index does not exist. Error: " + new ObjectMapper().writeValueAsString(item.index.error)); recordsToRemove.add(msrList.getRecord(item.index._id)); } } - _removeFromTrie(recordsToRemove); + if(recordsToRemove.size() != 0) { + _logger.info("{} records were not written to ES", recordsToRemove.size()); + records.removeAll(recordsToRemove); + } } + //add to bloom filter + _addToBloomFilter(records); } catch(IOException e) { - _removeFromTrie(records); throw new SystemException("Failed to parse reponse of put metrics. The response was: " + strResponse, e); } } - + private void _upsertAsync(List records) { - + String requestUrl = new StringBuilder().append("/") - .append(INDEX_NAME) - .append("/") - .append(TYPE_NAME) - .append("/") - .append("_bulk") - .toString(); - + .append(INDEX_NAME) + .append("/") + .append(TYPE_NAME) + .append("/") + .append("_bulk") + .toString(); + MetricSchemaRecordList msrList = new MetricSchemaRecordList(records, _idgenHashAlgo); StringEntity entity; try { String requestBody = _mapper.writeValueAsString(msrList); entity = new StringEntity(requestBody); } catch (JsonProcessingException | UnsupportedEncodingException e) { - _removeFromTrie(records); throw new SystemException("Failed to parse metrics to schema records when indexing.", e); } - + ResponseListener responseListener = new ResponseListener() { - + @Override public void onSuccess(Response response) { String strResponse = extractResponse(response); @@ -635,84 +637,87 @@ public void onSuccess(Response response) { _logger.warn("Failed to index metric. Reason: " + new ObjectMapper().writeValueAsString(item.create.error)); recordsToRemove.add(msrList.getRecord(item.create._id)); } - + if(item.index != null && item.index.status == HttpStatus.SC_NOT_FOUND) { _logger.warn("Index does not exist. Error: " + new ObjectMapper().writeValueAsString(item.index.error)); recordsToRemove.add(msrList.getRecord(item.index._id)); } } - _removeFromTrie(recordsToRemove); - } + if(recordsToRemove.size() != 0) { + _logger.info("{} records were not written to ES", recordsToRemove.size()); + records.removeAll(recordsToRemove); + } + } + //add to bloom filter + _addToBloomFilter(records); } catch(IOException e) { - _removeFromTrie(records); _logger.warn("Failed to parse reponse of put metrics. The response was: " + strResponse, e); } } - + @Override public void onFailure(Exception e) { //TODO: Retry with exponential back-off for handling EsRejectedExecutionException/RemoteTransportException/TimeoutException?? - _removeFromTrie(records); _logger.warn("Failed to execute the indexing request.", e); } }; - + _esRestClient.performRequestAsync(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), entity, responseListener); } - - private void _removeFromTrie(List records) { - _logger.info("Removing {} records from trie.", records.size()); - for(MetricSchemaRecord record : records) { - String key = constructTrieKey(record.getScope(), record.getMetric(), record.getTagKey(), record.getTagValue(), record.getNamespace()); - TRIE.remove(key); - } + + protected void _addToBloomFilter(List records){ + _logger.info("Adding {} records into bloom filter.", records.size()); + for(MetricSchemaRecord record : records) { + String key = constructKey(record.getScope(), record.getMetric(), record.getTagKey(), record.getTagValue(), record.getNamespace()); + bloomFilter.put(key); + } } - + private String _constructTermAggregationQuery(MetricSchemaRecordQuery query, RecordType type) { ObjectMapper mapper = new ObjectMapper(); ObjectNode queryNode = _constructQueryNode(query, mapper); - + long size = query.getLimit() * query.getPage(); SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, "(limit * page) must be greater than 0 and less than Integer.MAX_VALUE"); - + ObjectNode aggsNode = _constructAggsNode(type, Math.max(size, 10000), mapper); - + ObjectNode rootNode = mapper.createObjectNode(); rootNode.put("query", queryNode); rootNode.put("size", 0); rootNode.put("aggs", aggsNode); - + return rootNode.toString(); } - + private String _constructTermQuery(MetricSchemaRecordQuery query, int from, int size) { ObjectMapper mapper = new ObjectMapper(); - + ObjectNode queryNode = _constructQueryNode(query, mapper); - + ObjectNode rootNode = _mapper.createObjectNode(); rootNode.put("query", queryNode); rootNode.put("from", from); rootNode.put("size", size); - + return rootNode.toString(); } - + private ObjectNode _constructSimpleQueryStringNode(List tokens, RecordType... types) { - + if(tokens.isEmpty()) { return null; } - + ObjectMapper mapper = new ObjectMapper(); - + StringBuilder queryString = new StringBuilder(); for(String token : tokens) { queryString.append('+').append(token).append(' '); } queryString.replace(queryString.length() - 1, queryString.length(), "*"); - + ObjectNode node = mapper.createObjectNode(); ArrayNode fieldsNode = mapper.createArrayNode(); for(RecordType type : types) { @@ -720,54 +725,54 @@ private ObjectNode _constructSimpleQueryStringNode(List tokens, RecordTy } node.put("fields", fieldsNode); node.put("query", queryString.toString()); - + ObjectNode simpleQueryStringNode = mapper.createObjectNode(); simpleQueryStringNode.put("simple_query_string", node); - + return simpleQueryStringNode; } - + private String _constructQueryStringQuery(List tokens, int from, int size) { ObjectMapper mapper = new ObjectMapper(); - + ObjectNode simpleQueryStringNode = _constructSimpleQueryStringNode(tokens, RecordType.values()); - + ObjectNode rootNode = mapper.createObjectNode(); rootNode.put("query", simpleQueryStringNode); rootNode.put("from", from); rootNode.put("size", size); - + return rootNode.toString(); } - + private String _constructQueryStringQuery(KeywordQuery kq, Map> tokensMap) { ObjectMapper mapper = new ObjectMapper(); - + ArrayNode filterNodes = mapper.createArrayNode(); for(Map.Entry> entry : tokensMap.entrySet()) { ObjectNode simpleQueryStringNode = _constructSimpleQueryStringNode(entry.getValue(), entry.getKey()); filterNodes.add(simpleQueryStringNode); } - + ObjectNode boolNode = mapper.createObjectNode(); boolNode.put("filter", filterNodes); - + ObjectNode queryNode = mapper.createObjectNode(); queryNode.put("bool", boolNode); - + ObjectNode rootNode = mapper.createObjectNode(); rootNode.put("query", queryNode); rootNode.put("size", 0); - + long size = kq.getLimit() * kq.getPage(); SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, "(limit * page) must be greater than 0 and less than Integer.MAX_VALUE"); rootNode.put("aggs", _constructAggsNode(kq.getType(), Math.max(size, 10000), mapper)); - + return rootNode.toString(); - + } - + private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapper mapper) { ArrayNode filterNodes = mapper.createArrayNode(); if(SchemaService.containsFilter(query.getMetric())) { @@ -777,7 +782,7 @@ private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapp node.put("regexp", regexpNode); filterNodes.add(node); } - + if(SchemaService.containsFilter(query.getScope())) { ObjectNode node = mapper.createObjectNode(); ObjectNode regexpNode = mapper.createObjectNode(); @@ -785,7 +790,7 @@ private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapp node.put("regexp", regexpNode); filterNodes.add(node); } - + if(SchemaService.containsFilter(query.getTagKey())) { ObjectNode node = mapper.createObjectNode(); ObjectNode regexpNode = mapper.createObjectNode(); @@ -793,7 +798,7 @@ private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapp node.put("regexp", regexpNode); filterNodes.add(node); } - + if(SchemaService.containsFilter(query.getTagValue())) { ObjectNode node = mapper.createObjectNode(); ObjectNode regexpNode = mapper.createObjectNode(); @@ -801,7 +806,7 @@ private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapp node.put("regexp", regexpNode); filterNodes.add(node); } - + if(SchemaService.containsFilter(query.getNamespace())) { ObjectNode node = mapper.createObjectNode(); ObjectNode regexpNode = mapper.createObjectNode(); @@ -809,186 +814,186 @@ private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapp node.put("regexp", regexpNode); filterNodes.add(node); } - + ObjectNode boolNode = mapper.createObjectNode(); boolNode.put("filter", filterNodes); - + ObjectNode queryNode = mapper.createObjectNode(); queryNode.put("bool", boolNode); return queryNode; } - - + + private ObjectNode _constructAggsNode(RecordType type, long limit, ObjectMapper mapper) { - + ObjectNode termsNode = mapper.createObjectNode(); termsNode.put("field", type.getName() + ".raw"); termsNode.put("order", mapper.createObjectNode().put("_term", "asc")); termsNode.put("size", limit); termsNode.put("execution_hint", "map"); - + ObjectNode distinctValuesNode = mapper.createObjectNode(); distinctValuesNode.put("terms", termsNode); - + ObjectNode aggsNode = mapper.createObjectNode(); aggsNode.put("distinct_values", distinctValuesNode); return aggsNode; } - - + + /* Helper method to convert JSON String representation to the corresponding Java entity. */ - private T toEntity(String content, TypeReference type) { - try { - return _mapper.readValue(content, type); - } catch (IOException ex) { - throw new SystemException(ex); - } - } - - + private T toEntity(String content, TypeReference type) { + try { + return _mapper.readValue(content, type); + } catch (IOException ex) { + throw new SystemException(ex); + } + } + + /** Helper to process the response. * Throws a SystemException when the http status code is outsdie of the range 200 - 300. */ - private String extractResponse(Response response) { - requireArgument(response != null, "HttpResponse object cannot be null."); - - int status = response.getStatusLine().getStatusCode(); - String strResponse = extractStringResponse(response); - - if ((status < HttpStatus.SC_OK) || (status >= HttpStatus.SC_MULTIPLE_CHOICES)) { - throw new SystemException("Status code: " + status + " . Error occurred. " + strResponse); - } else { - return strResponse; - } - } - - private String extractStringResponse(Response content) { - requireArgument(content != null, "Response content is null."); - - String result; - HttpEntity entity = null; - - try { - entity = content.getEntity(); - if (entity == null) { - result = ""; - } else { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - entity.writeTo(baos); - result = baos.toString("UTF-8"); - } - return result; - } catch (IOException ex) { - throw new SystemException(ex); - } finally { - if (entity != null) { - try { - EntityUtils.consume(entity); - } catch (IOException ex) { - _logger.warn("Failed to close entity stream.", ex); - } - } - } - } - + private String extractResponse(Response response) { + requireArgument(response != null, "HttpResponse object cannot be null."); + + int status = response.getStatusLine().getStatusCode(); + String strResponse = extractStringResponse(response); + + if ((status < HttpStatus.SC_OK) || (status >= HttpStatus.SC_MULTIPLE_CHOICES)) { + throw new SystemException("Status code: " + status + " . Error occurred. " + strResponse); + } else { + return strResponse; + } + } + + private String extractStringResponse(Response content) { + requireArgument(content != null, "Response content is null."); + + String result; + HttpEntity entity = null; + + try { + entity = content.getEntity(); + if (entity == null) { + result = ""; + } else { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + entity.writeTo(baos); + result = baos.toString("UTF-8"); + } + return result; + } catch (IOException ex) { + throw new SystemException(ex); + } finally { + if (entity != null) { + try { + EntityUtils.consume(entity); + } catch (IOException ex) { + _logger.warn("Failed to close entity stream.", ex); + } + } + } + } + private ObjectMapper _createObjectMapper() { ObjectMapper mapper = new ObjectMapper(); - + mapper.setSerializationInclusion(Include.NON_NULL); SimpleModule module = new SimpleModule(); module.addSerializer(MetricSchemaRecordList.class, new MetricSchemaRecordList.Serializer()); module.addDeserializer(MetricSchemaRecordList.class, new MetricSchemaRecordList.Deserializer()); module.addDeserializer(List.class, new MetricSchemaRecordList.AggDeserializer()); mapper.registerModule(module); - + return mapper; } - - - private ObjectNode _createSettingsNode() { - ObjectMapper mapper = new ObjectMapper(); - - ObjectNode metadataAnalyzer = mapper.createObjectNode(); - metadataAnalyzer.put("tokenizer", "metadata_tokenizer"); - metadataAnalyzer.put("filter", mapper.createArrayNode().add("lowercase")); - - ObjectNode analyzerNode = mapper.createObjectNode(); - analyzerNode.put("metadata_analyzer", metadataAnalyzer); - - ObjectNode tokenizerNode = mapper.createObjectNode(); - tokenizerNode.put("metadata_tokenizer", mapper.createObjectNode().put("type", "pattern").put("pattern", "([^\\p{L}\\d]+)|(?<=[\\p{L}&&[^\\p{Lu}]])(?=\\p{Lu})|(?<=\\p{Lu})(?=\\p{Lu}[\\p{L}&&[^\\p{Lu}]])")); - - ObjectNode analysisNode = mapper.createObjectNode(); - analysisNode.put("analyzer", analyzerNode); - analysisNode.put("tokenizer", tokenizerNode); - - ObjectNode indexNode = mapper.createObjectNode(); - indexNode.put("max_result_window", INDEX_MAX_RESULT_WINDOW); - indexNode.put("number_of_replicas", _replicationFactor); - indexNode.put("number_of_shards", _numShards); - - ObjectNode settingsNode = mapper.createObjectNode(); - settingsNode.put("analysis", analysisNode); - settingsNode.put("index", indexNode); - - return settingsNode; - - } - - - private ObjectNode _createMappingsNode() { - ObjectMapper mapper = new ObjectMapper(); - - ObjectNode propertiesNode = mapper.createObjectNode(); - propertiesNode.put(RecordType.SCOPE.getName(), _createFieldNode(FIELD_TYPE_TEXT)); - propertiesNode.put(RecordType.METRIC.getName(), _createFieldNode(FIELD_TYPE_TEXT)); - propertiesNode.put(RecordType.TAGK.getName(), _createFieldNode(FIELD_TYPE_TEXT)); - propertiesNode.put(RecordType.TAGV.getName(), _createFieldNode(FIELD_TYPE_TEXT)); - propertiesNode.put(RecordType.NAMESPACE.getName(), _createFieldNode(FIELD_TYPE_TEXT)); - - propertiesNode.put("mts", _createFieldNode(FIELD_TYPE_DATE)); - - ObjectNode typeNode = mapper.createObjectNode(); - typeNode.put("properties", propertiesNode); - - ObjectNode mappingsNode = mapper.createObjectNode(); - mappingsNode.put(TYPE_NAME, typeNode); - - return mappingsNode; - } - - - private ObjectNode _createFieldNode(String type) { - ObjectMapper mapper = new ObjectMapper(); - - ObjectNode fieldNode = mapper.createObjectNode(); - fieldNode.put("type", type); - fieldNode.put("analyzer", "metadata_analyzer"); - ObjectNode keywordNode = mapper.createObjectNode(); - keywordNode.put("type", "keyword"); - ObjectNode fieldsNode = mapper.createObjectNode(); - fieldsNode.put("raw", keywordNode); - fieldNode.put("fields", fieldsNode); - return fieldNode; - } - + + + private ObjectNode _createSettingsNode() { + ObjectMapper mapper = new ObjectMapper(); + + ObjectNode metadataAnalyzer = mapper.createObjectNode(); + metadataAnalyzer.put("tokenizer", "metadata_tokenizer"); + metadataAnalyzer.put("filter", mapper.createArrayNode().add("lowercase")); + + ObjectNode analyzerNode = mapper.createObjectNode(); + analyzerNode.put("metadata_analyzer", metadataAnalyzer); + + ObjectNode tokenizerNode = mapper.createObjectNode(); + tokenizerNode.put("metadata_tokenizer", mapper.createObjectNode().put("type", "pattern").put("pattern", "([^\\p{L}\\d]+)|(?<=[\\p{L}&&[^\\p{Lu}]])(?=\\p{Lu})|(?<=\\p{Lu})(?=\\p{Lu}[\\p{L}&&[^\\p{Lu}]])")); + + ObjectNode analysisNode = mapper.createObjectNode(); + analysisNode.put("analyzer", analyzerNode); + analysisNode.put("tokenizer", tokenizerNode); + + ObjectNode indexNode = mapper.createObjectNode(); + indexNode.put("max_result_window", INDEX_MAX_RESULT_WINDOW); + indexNode.put("number_of_replicas", _replicationFactor); + indexNode.put("number_of_shards", _numShards); + + ObjectNode settingsNode = mapper.createObjectNode(); + settingsNode.put("analysis", analysisNode); + settingsNode.put("index", indexNode); + + return settingsNode; + + } + + + private ObjectNode _createMappingsNode() { + ObjectMapper mapper = new ObjectMapper(); + + ObjectNode propertiesNode = mapper.createObjectNode(); + propertiesNode.put(RecordType.SCOPE.getName(), _createFieldNode(FIELD_TYPE_TEXT)); + propertiesNode.put(RecordType.METRIC.getName(), _createFieldNode(FIELD_TYPE_TEXT)); + propertiesNode.put(RecordType.TAGK.getName(), _createFieldNode(FIELD_TYPE_TEXT)); + propertiesNode.put(RecordType.TAGV.getName(), _createFieldNode(FIELD_TYPE_TEXT)); + propertiesNode.put(RecordType.NAMESPACE.getName(), _createFieldNode(FIELD_TYPE_TEXT)); + + propertiesNode.put("mts", _createFieldNode(FIELD_TYPE_DATE)); + + ObjectNode typeNode = mapper.createObjectNode(); + typeNode.put("properties", propertiesNode); + + ObjectNode mappingsNode = mapper.createObjectNode(); + mappingsNode.put(TYPE_NAME, typeNode); + + return mappingsNode; + } + + + private ObjectNode _createFieldNode(String type) { + ObjectMapper mapper = new ObjectMapper(); + + ObjectNode fieldNode = mapper.createObjectNode(); + fieldNode.put("type", type); + fieldNode.put("analyzer", "metadata_analyzer"); + ObjectNode keywordNode = mapper.createObjectNode(); + keywordNode.put("type", "keyword"); + ObjectNode fieldsNode = mapper.createObjectNode(); + fieldsNode.put("raw", keywordNode); + fieldNode.put("fields", fieldsNode); + return fieldNode; + } + private void _createIndexIfNotExists() { try { Response response = _esRestClient.performRequest(HttpMethod.HEAD.getName(), "/" + INDEX_NAME); boolean indexExists = response.getStatusLine().getStatusCode() == HttpStatus.SC_OK ? true : false; - + if(!indexExists) { _logger.info("Index [" + INDEX_NAME + "] does not exist. Will create one."); ObjectMapper mapper = new ObjectMapper(); - + ObjectNode rootNode = mapper.createObjectNode(); rootNode.put("settings", _createSettingsNode()); rootNode.put("mappings", _createMappingsNode()); - + String settingsAndMappingsJson = rootNode.toString(); String requestUrl = new StringBuilder().append("/").append(INDEX_NAME).toString(); - + response = _esRestClient.performRequest(HttpMethod.PUT.getName(), requestUrl, Collections.emptyMap(), new StringEntity(settingsAndMappingsJson)); extractResponse(response); } @@ -996,26 +1001,26 @@ private void _createIndexIfNotExists() { _logger.error("Failed to check/create elasticsearch index. ElasticSearchSchemaService may not function.", e); } } - - /** - * Enumeration of supported HTTP methods. - * - * @author Bhinav Sura (bhinav.sura@salesforce.com) - */ - private enum HttpMethod { - - /** POST operation. */ - POST("POST"), - /** PUT operation. */ - PUT("PUT"), - /** HEAD operation. */ - HEAD("HEAD"); - - private String name; - - HttpMethod(String name) { - this.setName(name); - } + + /** + * Enumeration of supported HTTP methods. + * + * @author Bhinav Sura (bhinav.sura@salesforce.com) + */ + private enum HttpMethod { + + /** POST operation. */ + POST("POST"), + /** PUT operation. */ + PUT("PUT"), + /** HEAD operation. */ + HEAD("HEAD"); + + private String name; + + HttpMethod(String name) { + this.setName(name); + } public String getName() { return name; @@ -1024,70 +1029,70 @@ public String getName() { public void setName(String name) { this.name = name; } - } - - + } + + /** - * The set of implementation specific configuration properties. - * - * @author Bhinav Sura (bhinav.sura@salesforce.com) - */ - public enum Property { - - ELASTICSEARCH_ENDPOINT("service.property.schema.elasticsearch.endpoint", "http://localhost:9200,http://localhost:9201"), - /** Connection timeout for ES REST client. */ - ELASTICSEARCH_ENDPOINT_CONNECTION_TIMEOUT("service.property.schema.elasticsearch.endpoint.connection.timeout", "10000"), - /** Socket connection timeout for ES REST client. */ - ELASTICSEARCH_ENDPOINT_SOCKET_TIMEOUT("service.property.schema.elasticsearch.endpoint.socket.timeout", "10000"), - /** Connection count for ES REST client. */ - ELASTICSEARCH_CONNECTION_COUNT("service.property.schema.elasticsearch.connection.count", "10"), - /** Replication factor for metadata_index. */ - ELASTICSEARCH_NUM_REPLICAS("service.property.schema.elasticsearch.num.replicas", "1"), - /** Shard count for metadata_index. */ - ELASTICSEARCH_SHARDS_COUNT("service.property.schema.elasticsearch.shards.count", "10"), - /** The no. of records to batch for bulk indexing requests. - * https://www.elastic.co/guide/en/elasticsearch/guide/current/indexing-performance.html#_using_and_sizing_bulk_requests - */ - ELASTICSEARCH_INDEXING_BATCH_SIZE("service.property.schema.elasticsearch.indexing.batch.size", "10000"), - /** The hashing algorithm to use for generating document id. */ - ELASTICSEARCH_IDGEN_HASH_ALGO("service.property.schema.elasticsearch.idgen.hash.algo", "MD5"); - - private final String _name; - private final String _defaultValue; - - private Property(String name, String defaultValue) { - _name = name; - _defaultValue = defaultValue; - } - - /** - * Returns the property name. - * - * @return The property name. - */ - public String getName() { - return _name; - } - - /** - * Returns the default value for the property. - * - * @return The default value. - */ - public String getDefaultValue() { - return _defaultValue; - } - } - - - static class PutResponse { - private int took; - private boolean errors; - private List items; - - public PutResponse() {} - - public int getTook() { + * The set of implementation specific configuration properties. + * + * @author Bhinav Sura (bhinav.sura@salesforce.com) + */ + public enum Property { + + ELASTICSEARCH_ENDPOINT("service.property.schema.elasticsearch.endpoint", "http://localhost:9200,http://localhost:9201"), + /** Connection timeout for ES REST client. */ + ELASTICSEARCH_ENDPOINT_CONNECTION_TIMEOUT("service.property.schema.elasticsearch.endpoint.connection.timeout", "10000"), + /** Socket connection timeout for ES REST client. */ + ELASTICSEARCH_ENDPOINT_SOCKET_TIMEOUT("service.property.schema.elasticsearch.endpoint.socket.timeout", "10000"), + /** Connection count for ES REST client. */ + ELASTICSEARCH_CONNECTION_COUNT("service.property.schema.elasticsearch.connection.count", "10"), + /** Replication factor for metadata_index. */ + ELASTICSEARCH_NUM_REPLICAS("service.property.schema.elasticsearch.num.replicas", "1"), + /** Shard count for metadata_index. */ + ELASTICSEARCH_SHARDS_COUNT("service.property.schema.elasticsearch.shards.count", "10"), + /** The no. of records to batch for bulk indexing requests. + * https://www.elastic.co/guide/en/elasticsearch/guide/current/indexing-performance.html#_using_and_sizing_bulk_requests + */ + ELASTICSEARCH_INDEXING_BATCH_SIZE("service.property.schema.elasticsearch.indexing.batch.size", "10000"), + /** The hashing algorithm to use for generating document id. */ + ELASTICSEARCH_IDGEN_HASH_ALGO("service.property.schema.elasticsearch.idgen.hash.algo", "MD5"); + + private final String _name; + private final String _defaultValue; + + private Property(String name, String defaultValue) { + _name = name; + _defaultValue = defaultValue; + } + + /** + * Returns the property name. + * + * @return The property name. + */ + public String getName() { + return _name; + } + + /** + * Returns the default value for the property. + * + * @return The default value. + */ + public String getDefaultValue() { + return _defaultValue; + } + } + + + static class PutResponse { + private int took; + private boolean errors; + private List items; + + public PutResponse() {} + + public int getTook() { return took; } @@ -1113,10 +1118,10 @@ public void setItems(List items) { @JsonIgnoreProperties(ignoreUnknown = true) static class Item { - private CreateItem create; - private CreateItem index; - - public Item() {} + private CreateItem create; + private CreateItem index; + + public Item() {} public CreateItem getCreate() { return create; @@ -1125,7 +1130,7 @@ public CreateItem getCreate() { public void setCreate(CreateItem create) { this.create = create; } - + public CreateItem getIndex() { return index; } @@ -1133,46 +1138,46 @@ public CreateItem getIndex() { public void setIndex(CreateItem index) { this.index = index; } - } - + } + @JsonIgnoreProperties(ignoreUnknown = true) - static class CreateItem { - private String _index; - private String _type; - private String _id; - private int status; - private Error error; - - public CreateItem() {} - + static class CreateItem { + private String _index; + private String _type; + private String _id; + private int status; + private Error error; + + public CreateItem() {} + public String get_index() { return _index; } - + public void set_index(String _index) { this._index = _index; } - + public String get_type() { return _type; } - + public void set_type(String _type) { this._type = _type; } - + public String get_id() { return _id; } - + public void set_id(String _id) { this._id = _id; } - + public int getStatus() { return status; } - + public void setStatus(int status) { this.status = status; } @@ -1184,13 +1189,13 @@ public Error getError() { public void setError(Error error) { this.error = error; } - } - + } + @JsonIgnoreProperties(ignoreUnknown = true) static class Error { private String type; private String reason; - + public Error() {} public String getType() { @@ -1209,6 +1214,6 @@ public void setReason(String reason) { this.reason = reason; } } - } + } } diff --git a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java index 55293b376..dc6d52bf1 100644 --- a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java +++ b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java @@ -3,7 +3,10 @@ import static org.junit.Assert.assertTrue; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Calendar; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -16,11 +19,12 @@ import com.salesforce.dva.argus.AbstractTest; import com.salesforce.dva.argus.entity.Metric; +import com.salesforce.dva.argus.entity.MetricSchemaRecord; import com.salesforce.dva.argus.service.schema.ElasticSearchSchemaService; /** - * This test suite tests the trie-based caching in the AbstractSchemaService class. Although we are instantiating + * This test suite tests the bloom filter caching in the AbstractSchemaService class. Although we are instantiating * ElasticSearchSchemaService object, the implemtationSpecificPut (which is part of ES Schema Service) has been * mocked out. In essence, these tests only test the caching functionality. * @@ -31,47 +35,48 @@ public class AbstractSchemaServiceTest extends AbstractTest { @Test public void testPutEverythingCached() { - List metrics = createRandomMetrics("test-scope", "test-metric", 10); ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService()); - _enableCaching(service); final AtomicInteger count = new AtomicInteger(); ElasticSearchSchemaService spyService = _initializeSpyService(service, count); spyService.put(metrics); + // add to bloom filter cache + spyService._addToBloomFilter(spyService._fracture(metrics).get(0)); assertTrue(count.get() == metrics.size()); spyService.put(metrics); + // count should be same since we are re-reading cached value assertTrue(count.get() == metrics.size()); } @Test public void testPutPartialCached() { - List metrics = createRandomMetrics("test-scope", "test-metric", 10); List newMetrics = createRandomMetrics("test-scope", "test-metric1", 5); Set total = new HashSet<>(metrics); total.addAll(newMetrics); ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService()); - _enableCaching(service); final AtomicInteger count = new AtomicInteger(); ElasticSearchSchemaService spyService = _initializeSpyService(service, count); spyService.put(metrics); + // 1st metric cached + spyService._addToBloomFilter(spyService._fracture(metrics).get(0)); assertTrue(count.get() == metrics.size()); + // 1st metric already in cache (partial case scenario), and now 2nd metric will also be added to cache. + // Total number of metrics in cache = metric1.size() and metric2.size() spyService.put(new ArrayList<>(total)); + spyService._addToBloomFilter(spyService._fracture(new ArrayList<>(total)).get(0)); assertTrue(count.get() == total.size()); - } @Test public void testPutNothingCached() { - List metrics = createRandomMetrics("test-scope", "test-metric", 10); List newMetrics = createRandomMetrics("test-scope", "test-metric1", 5); ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService()); - _enableCaching(service); final AtomicInteger count = new AtomicInteger(); ElasticSearchSchemaService spyService = _initializeSpyService(service, count); @@ -80,22 +85,7 @@ public void testPutNothingCached() { spyService.put(newMetrics); assertTrue(count.get() == metrics.size() + newMetrics.size()); } - - @Test - public void testPutCachingDisabled() { - - List metrics = createRandomMetrics("test-scope", "test-metric", 10); - - ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService()); - final AtomicInteger count = new AtomicInteger(); - ElasticSearchSchemaService spyService = _initializeSpyService(service, count); - - spyService.put(metrics); - assertTrue(count.get() == metrics.size()); - spyService.put(metrics); - assertTrue(count.get() == metrics.size() * 2); - } - + private ElasticSearchSchemaService _initializeSpyService(ElasticSearchSchemaService service, final AtomicInteger count) { ElasticSearchSchemaService spyService = Mockito.spy(service); @@ -111,17 +101,14 @@ public Void answer(InvocationOnMock invocation) throws Throwable { return spyService; } - - - private void _enableCaching(ElasticSearchSchemaService service) { - try { - Field field = service.getClass().getSuperclass().getDeclaredField("_cacheEnabled"); - field.setAccessible(true); - field.set(service, true); - } catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + @Test + public void getNumHoursUntilNextFlushBloomFilter() { + ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService()); + + Calendar calendar = Calendar.getInstance(); + + // Will wait 24 hours before next flush if at same hour boundary + int hour = calendar.get(Calendar.HOUR_OF_DAY); + assertTrue(service.getNumHoursUntilTargetHour(hour) == 24); } - }