From 86e3760eb82247239ab489e42b7df52d328bfa32 Mon Sep 17 00:00:00 2001 From: Dilip Date: Mon, 21 May 2018 14:15:29 -0700 Subject: [PATCH 01/19] 1) Update guava library version 2) Introduce BloomFilter charset -default murmur hash function --- ArgusCore/pom.xml | 2 +- .../service/schema/AbstractSchemaService.java | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/ArgusCore/pom.xml b/ArgusCore/pom.xml index 6fb8f66af..2b9e276f3 100644 --- a/ArgusCore/pom.xml +++ b/ArgusCore/pom.xml @@ -309,7 +309,7 @@ com.google.guava guava - 19.0 + 23.0 org.apache.commons diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index c6d29387a..3db80cbd2 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -3,6 +3,7 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryPoolMXBean; import java.lang.management.MemoryType; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -12,6 +13,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.hash.BloomFilter; +import com.google.common.hash.Funnels; import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree; import com.googlecode.concurrenttrees.radix.RadixTree; import com.googlecode.concurrenttrees.radix.node.concrete.SmartArrayBasedNodeFactory; @@ -30,6 +33,7 @@ public abstract class AbstractSchemaService extends DefaultService implements Sc private static final long MAX_MEMORY = Runtime.getRuntime().maxMemory(); private static final long POLL_INTERVAL_MS = 60 * 1000L; protected static final RadixTree TRIE = new ConcurrentRadixTree<>(new SmartArrayBasedNodeFactory()); + protected static final BloomFilter bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), (long) 400000000); private static boolean _writesToTrieEnabled = true; @@ -78,22 +82,26 @@ public void put(List metrics) { for(Metric metric : metrics) { if(metric.getTags().isEmpty()) { String key = constructTrieKey(metric, null); - boolean found = TRIE.getValueForExactKey(key) != null; + // boolean found = TRIE.getValueForExactKey(key) != null; + boolean found = bloomFilter.mightContain(key); if(!found) { metricsToPut.add(metric); if(_writesToTrieEnabled) { - TRIE.putIfAbsent(key, VoidValue.SINGLETON); + bloomFilter.put(key); + // TRIE.putIfAbsent(key, VoidValue.SINGLETON); } } } else { boolean newTags = false; for(Entry tagEntry : metric.getTags().entrySet()) { String key = constructTrieKey(metric, tagEntry); - boolean found = TRIE.getValueForExactKey(key) != null; + // boolean found = TRIE.getValueForExactKey(key) != null; + boolean found = bloomFilter.mightContain(key); if(!found) { newTags = true; if(_writesToTrieEnabled) { - TRIE.putIfAbsent(key, VoidValue.SINGLETON); + bloomFilter.put(key); + //TRIE.putIfAbsent(key, VoidValue.SINGLETON); } } } From bf99bd9b1018ec8e32ac8540ead6b7054045b551 Mon Sep 17 00:00:00 2001 From: Dilip Date: Mon, 21 May 2018 14:31:30 -0700 Subject: [PATCH 02/19] Add false positive probability --- .../dva/argus/service/schema/AbstractSchemaService.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index 3db80cbd2..5d8bc9863 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -33,7 +33,8 @@ public abstract class AbstractSchemaService extends DefaultService implements Sc private static final long MAX_MEMORY = Runtime.getRuntime().maxMemory(); private static final long POLL_INTERVAL_MS = 60 * 1000L; protected static final RadixTree TRIE = new ConcurrentRadixTree<>(new SmartArrayBasedNodeFactory()); - protected static final BloomFilter bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), (long) 400000000); + protected static final BloomFilter bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), + (long) 400000000 , 0.00001); private static boolean _writesToTrieEnabled = true; @@ -84,6 +85,7 @@ public void put(List metrics) { String key = constructTrieKey(metric, null); // boolean found = TRIE.getValueForExactKey(key) != null; boolean found = bloomFilter.mightContain(key); + _logger.info("Bloom approx elements = {}", bloomFilter.approximateElementCount()); if(!found) { metricsToPut.add(metric); if(_writesToTrieEnabled) { @@ -97,6 +99,7 @@ public void put(List metrics) { String key = constructTrieKey(metric, tagEntry); // boolean found = TRIE.getValueForExactKey(key) != null; boolean found = bloomFilter.mightContain(key); + // _logger.info("Bloom approx elements = {}", bloomFilter.approximateElementCount()); if(!found) { newTags = true; if(_writesToTrieEnabled) { From ff1c0ce35cc4986f521ddfcdd1d7dbc4a9509593 Mon Sep 17 00:00:00 2001 From: Dilip Date: Mon, 21 May 2018 14:44:58 -0700 Subject: [PATCH 03/19] Default hash function MURMUR128_MIT2_64 used --- .../dva/argus/service/schema/AbstractSchemaService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index 5d8bc9863..f18342eee 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -85,7 +85,7 @@ public void put(List metrics) { String key = constructTrieKey(metric, null); // boolean found = TRIE.getValueForExactKey(key) != null; boolean found = bloomFilter.mightContain(key); - _logger.info("Bloom approx elements = {}", bloomFilter.approximateElementCount()); + // _logger.info("Bloom approx elements = {}", bloomFilter.approximateElementCount()); if(!found) { metricsToPut.add(metric); if(_writesToTrieEnabled) { From 6ac6e555a31a11d693a19194dd9a23b53b242db2 Mon Sep 17 00:00:00 2001 From: Dilip Date: Mon, 21 May 2018 15:33:02 -0700 Subject: [PATCH 04/19] Make bloom filter parameters configurable --- .../service/schema/AbstractSchemaService.java | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index f18342eee..7842b3785 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -25,6 +25,7 @@ import com.salesforce.dva.argus.entity.MetricSchemaRecordQuery; import com.salesforce.dva.argus.service.DefaultService; import com.salesforce.dva.argus.service.SchemaService; +import com.salesforce.dva.argus.service.schema.AsyncHbaseSchemaService.Property; import com.salesforce.dva.argus.system.SystemAssert; import com.salesforce.dva.argus.system.SystemConfiguration; @@ -33,8 +34,7 @@ public abstract class AbstractSchemaService extends DefaultService implements Sc private static final long MAX_MEMORY = Runtime.getRuntime().maxMemory(); private static final long POLL_INTERVAL_MS = 60 * 1000L; protected static final RadixTree TRIE = new ConcurrentRadixTree<>(new SmartArrayBasedNodeFactory()); - protected static final BloomFilter bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), - (long) 400000000 , 0.00001); + protected static BloomFilter BLOOMFILTER; private static boolean _writesToTrieEnabled = true; @@ -45,6 +45,12 @@ public abstract class AbstractSchemaService extends DefaultService implements Sc protected AbstractSchemaService(SystemConfiguration config) { super(config); + + int expectedNumberInsertions = Integer.parseInt(config.getValue(Property.BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS.getName(), + Property.BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS.getDefaultValue())); + double errorRate = Double.parseDouble(config.getValue(Property.BLOOMFILTER_ERROR_RATE.getName(), + Property.BLOOMFILTER_ERROR_RATE.getDefaultValue())); + BLOOMFILTER = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), expectedNumberInsertions , errorRate); _cacheEnabled = Boolean.parseBoolean( config.getValue(Property.CACHE_SCHEMARECORDS.getName(), Property.CACHE_SCHEMARECORDS.getDefaultValue())); @@ -84,12 +90,12 @@ public void put(List metrics) { if(metric.getTags().isEmpty()) { String key = constructTrieKey(metric, null); // boolean found = TRIE.getValueForExactKey(key) != null; - boolean found = bloomFilter.mightContain(key); + boolean found = BLOOMFILTER.mightContain(key); // _logger.info("Bloom approx elements = {}", bloomFilter.approximateElementCount()); if(!found) { metricsToPut.add(metric); if(_writesToTrieEnabled) { - bloomFilter.put(key); + BLOOMFILTER.put(key); // TRIE.putIfAbsent(key, VoidValue.SINGLETON); } } @@ -98,12 +104,12 @@ public void put(List metrics) { for(Entry tagEntry : metric.getTags().entrySet()) { String key = constructTrieKey(metric, tagEntry); // boolean found = TRIE.getValueForExactKey(key) != null; - boolean found = bloomFilter.mightContain(key); + boolean found = BLOOMFILTER.mightContain(key); // _logger.info("Bloom approx elements = {}", bloomFilter.approximateElementCount()); if(!found) { newTags = true; if(_writesToTrieEnabled) { - bloomFilter.put(key); + BLOOMFILTER.put(key); //TRIE.putIfAbsent(key, VoidValue.SINGLETON); } } @@ -198,7 +204,10 @@ public enum Property { /* If set to true, schema records will be cached on writes. This helps to check if a schema records already exists, * and if it does then do not rewrite. Provide more heap space when using this option. */ CACHE_SCHEMARECORDS("service.property.schema.cache.schemarecords", "false"), - SYNC_PUT("service.property.schema.sync.put", "false"); + SYNC_PUT("service.property.schema.sync.put", "false"), + + BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS("service.property.schema.bloomfilter.expected.number.insertions", "400000000"), + BLOOMFILTER_ERROR_RATE("service.property.schema.bloomfilter.error.rate", "0.00001"); private final String _name; private final String _defaultValue; From bf79450dfba2f9c7ac0332110f8363761d16f6b6 Mon Sep 17 00:00:00 2001 From: Dilip Date: Mon, 21 May 2018 15:34:22 -0700 Subject: [PATCH 05/19] Remove unused import --- .../dva/argus/service/schema/AbstractSchemaService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index 7842b3785..7c0edb6ec 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -25,7 +25,6 @@ import com.salesforce.dva.argus.entity.MetricSchemaRecordQuery; import com.salesforce.dva.argus.service.DefaultService; import com.salesforce.dva.argus.service.SchemaService; -import com.salesforce.dva.argus.service.schema.AsyncHbaseSchemaService.Property; import com.salesforce.dva.argus.system.SystemAssert; import com.salesforce.dva.argus.system.SystemConfiguration; From a013440dd76fb3eff0a809537913686c90193662 Mon Sep 17 00:00:00 2001 From: Dilip Date: Tue, 22 May 2018 14:02:07 -0700 Subject: [PATCH 06/19] 1)BloomFilter monitor thread for getting statistics and flushing contents --- .../service/schema/AbstractSchemaService.java | 300 +++++++++--------- 1 file changed, 149 insertions(+), 151 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index 7c0edb6ec..d9cf679f1 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -1,8 +1,5 @@ package com.salesforce.dva.argus.service.schema; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryPoolMXBean; -import java.lang.management.MemoryType; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; @@ -29,75 +26,77 @@ import com.salesforce.dva.argus.system.SystemConfiguration; public abstract class AbstractSchemaService extends DefaultService implements SchemaService { - - private static final long MAX_MEMORY = Runtime.getRuntime().maxMemory(); + private static final long POLL_INTERVAL_MS = 60 * 1000L; + private static final long BLOOM_FILTER_FLUSH_INTERVAL = 6 * 60 * 60 * 1000L; protected static final RadixTree TRIE = new ConcurrentRadixTree<>(new SmartArrayBasedNodeFactory()); protected static BloomFilter BLOOMFILTER; - + private int bloomFilterExpectedNumberInsertions; + private double bloomFilterErrorRate; + private static boolean _writesToTrieEnabled = true; - - private final Logger _logger = LoggerFactory.getLogger(getClass()); - private final Thread _oldGenMonitorThread; + + private final Logger _logger = LoggerFactory.getLogger(getClass()); + private final Thread _bloomFilterMonitorThread; private final boolean _cacheEnabled; - protected final boolean _syncPut; + protected final boolean _syncPut; protected AbstractSchemaService(SystemConfiguration config) { super(config); - int expectedNumberInsertions = Integer.parseInt(config.getValue(Property.BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS.getName(), + bloomFilterExpectedNumberInsertions = Integer.parseInt(config.getValue(Property.BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS.getName(), Property.BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS.getDefaultValue())); - double errorRate = Double.parseDouble(config.getValue(Property.BLOOMFILTER_ERROR_RATE.getName(), + bloomFilterErrorRate = Double.parseDouble(config.getValue(Property.BLOOMFILTER_ERROR_RATE.getName(), Property.BLOOMFILTER_ERROR_RATE.getDefaultValue())); - BLOOMFILTER = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), expectedNumberInsertions , errorRate); - - _cacheEnabled = Boolean.parseBoolean( - config.getValue(Property.CACHE_SCHEMARECORDS.getName(), Property.CACHE_SCHEMARECORDS.getDefaultValue())); - _syncPut = Boolean.parseBoolean( - config.getValue(Property.SYNC_PUT.getName(), Property.SYNC_PUT.getDefaultValue())); - - _oldGenMonitorThread = new Thread(new OldGenMonitorThread(), "old-gen-monitor"); - if(_cacheEnabled) { - _oldGenMonitorThread.start(); - } + BLOOMFILTER = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), bloomFilterExpectedNumberInsertions , bloomFilterErrorRate); + + _cacheEnabled = Boolean.parseBoolean( + config.getValue(Property.CACHE_SCHEMARECORDS.getName(), Property.CACHE_SCHEMARECORDS.getDefaultValue())); + _syncPut = Boolean.parseBoolean( + config.getValue(Property.SYNC_PUT.getName(), Property.SYNC_PUT.getDefaultValue())); + + _bloomFilterMonitorThread = new Thread(new BloomFilterMonitorThread(), "bloom-filter-monitor"); + if(_cacheEnabled) { + _bloomFilterMonitorThread.start(); + } } @Override public void put(Metric metric) { requireNotDisposed(); SystemAssert.requireArgument(metric != null, "Metric cannot be null."); - + put(Arrays.asList(metric)); } @Override public void put(List metrics) { requireNotDisposed(); - SystemAssert.requireArgument(metrics != null, "Metric list cannot be null."); - - //If cache is not enabled, call implementation specific put with the list of metrics. - if(!_cacheEnabled) { - implementationSpecificPut(metrics); - return; - } - - //If cache is enabled, create a list of metricsToPut that do not exist on the TRIE and then call implementation - // specific put with only those subset of metricsToPut. - List metricsToPut = new ArrayList<>(metrics.size()); - + SystemAssert.requireArgument(metrics != null, "Metric list cannot be null."); + + //If cache is not enabled, call implementation specific put with the list of metrics. + if(!_cacheEnabled) { + implementationSpecificPut(metrics); + return; + } + + //If cache is enabled, create a list of metricsToPut that do not exist on the TRIE and then call implementation + // specific put with only those subset of metricsToPut. + List metricsToPut = new ArrayList<>(metrics.size()); + for(Metric metric : metrics) { if(metric.getTags().isEmpty()) { String key = constructTrieKey(metric, null); // boolean found = TRIE.getValueForExactKey(key) != null; boolean found = BLOOMFILTER.mightContain(key); // _logger.info("Bloom approx elements = {}", bloomFilter.approximateElementCount()); - if(!found) { - metricsToPut.add(metric); - if(_writesToTrieEnabled) { - BLOOMFILTER.put(key); - // TRIE.putIfAbsent(key, VoidValue.SINGLETON); - } - } + if(!found) { + metricsToPut.add(metric); + if(_writesToTrieEnabled) { + BLOOMFILTER.put(key); + // TRIE.putIfAbsent(key, VoidValue.SINGLETON); + } + } } else { boolean newTags = false; for(Entry tagEntry : metric.getTags().entrySet()) { @@ -105,58 +104,58 @@ public void put(List metrics) { // boolean found = TRIE.getValueForExactKey(key) != null; boolean found = BLOOMFILTER.mightContain(key); // _logger.info("Bloom approx elements = {}", bloomFilter.approximateElementCount()); - if(!found) { - newTags = true; - if(_writesToTrieEnabled) { - BLOOMFILTER.put(key); - //TRIE.putIfAbsent(key, VoidValue.SINGLETON); - } - } + if(!found) { + newTags = true; + if(_writesToTrieEnabled) { + BLOOMFILTER.put(key); + //TRIE.putIfAbsent(key, VoidValue.SINGLETON); + } + } } - + if(newTags) { metricsToPut.add(metric); } } } - + implementationSpecificPut(metricsToPut); } - - + + protected abstract void implementationSpecificPut(List metrics); protected String constructTrieKey(Metric metric, Entry tagEntry) { StringBuilder sb = new StringBuilder(metric.getScope()); sb.append('\0').append(metric.getMetric()); - + if(metric.getNamespace() != null) { sb.append('\0').append(metric.getNamespace()); } - + if(tagEntry != null) { sb.append('\0').append(tagEntry.getKey()).append('\0').append(tagEntry.getValue()); } - + return sb.toString(); } - + protected String constructTrieKey(String scope, String metric, String tagk, String tagv, String namespace) { StringBuilder sb = new StringBuilder(scope); sb.append('\0').append(metric); - + if(namespace != null) { sb.append('\0').append(namespace); } - + if(tagk != null) { sb.append('\0').append(tagk); } - + if(tagv != null) { sb.append('\0').append(tagv); } - + return sb.toString(); } @@ -164,25 +163,25 @@ protected String constructTrieKey(String scope, String metric, String tagk, Stri @Override public void dispose() { requireNotDisposed(); - if (_oldGenMonitorThread != null && _oldGenMonitorThread.isAlive()) { - _logger.info("Stopping old gen monitor thread."); - _oldGenMonitorThread.interrupt(); - _logger.info("Old gen monitor thread interrupted."); - try { - _logger.info("Waiting for old gen monitor thread to terminate."); - _oldGenMonitorThread.join(); - } catch (InterruptedException ex) { - _logger.warn("Old gen monitor thread was interrupted while shutting down."); - } - _logger.info("System monitoring stopped."); - } else { - _logger.info("Requested shutdown of old gen monitor thread aborted, as it is not yet running."); - } + if (_bloomFilterMonitorThread != null && _bloomFilterMonitorThread.isAlive()) { + _logger.info("Stopping old gen monitor thread."); + _bloomFilterMonitorThread.interrupt(); + _logger.info("Old gen monitor thread interrupted."); + try { + _logger.info("Waiting for old gen monitor thread to terminate."); + _bloomFilterMonitorThread.join(); + } catch (InterruptedException ex) { + _logger.warn("Old gen monitor thread was interrupted while shutting down."); + } + _logger.info("System monitoring stopped."); + } else { + _logger.info("Requested shutdown of old gen monitor thread aborted, as it is not yet running."); + } } @Override public abstract Properties getServiceProperties(); - + @Override public abstract List get(MetricSchemaRecordQuery query); @@ -191,99 +190,98 @@ public void dispose() { @Override public abstract List keywordSearch(KeywordQuery query); - - + + + /** + * The set of implementation specific configuration properties. + * + * @author Bhinav Sura (bhinav.sura@salesforce.com) + */ + public enum Property { + + /* If set to true, schema records will be cached on writes. This helps to check if a schema records already exists, + * and if it does then do not rewrite. Provide more heap space when using this option. */ + CACHE_SCHEMARECORDS("service.property.schema.cache.schemarecords", "false"), + SYNC_PUT("service.property.schema.sync.put", "false"), + + BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS("service.property.schema.bloomfilter.expected.number.insertions", "400000000"), + BLOOMFILTER_ERROR_RATE("service.property.schema.bloomfilter.error.rate", "0.00001"); + + private final String _name; + private final String _defaultValue; + + private Property(String name, String defaultValue) { + _name = name; + _defaultValue = defaultValue; + } + + /** + * Returns the property name. + * + * @return The property name. + */ + public String getName() { + return _name; + } + + /** + * Returns the default value for the property. + * + * @return The default value. + */ + public String getDefaultValue() { + return _defaultValue; + } + } + + + //~ Inner Classes ******************************************************************************************************************************** + /** - * The set of implementation specific configuration properties. - * - * @author Bhinav Sura (bhinav.sura@salesforce.com) - */ - public enum Property { - - /* If set to true, schema records will be cached on writes. This helps to check if a schema records already exists, - * and if it does then do not rewrite. Provide more heap space when using this option. */ - CACHE_SCHEMARECORDS("service.property.schema.cache.schemarecords", "false"), - SYNC_PUT("service.property.schema.sync.put", "false"), - - BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS("service.property.schema.bloomfilter.expected.number.insertions", "400000000"), - BLOOMFILTER_ERROR_RATE("service.property.schema.bloomfilter.error.rate", "0.00001"); - - private final String _name; - private final String _defaultValue; - - private Property(String name, String defaultValue) { - _name = name; - _defaultValue = defaultValue; - } - - /** - * Returns the property name. - * - * @return The property name. - */ - public String getName() { - return _name; - } - - /** - * Returns the default value for the property. - * - * @return The default value. - */ - public String getDefaultValue() { - return _defaultValue; - } - } - - - //~ Inner Classes ******************************************************************************************************************************** - - /** - * Old Generation monitoring thread. - * - * @author Bhinav Sura (bhinav.sura@salesforce.com) - */ - private class OldGenMonitorThread implements Runnable { + * Bloom Filter monitoring thread. + * + * @author Dilip Devaraj (ddevaraj@salesforce.com) + */ + private class BloomFilterMonitorThread implements Runnable { + + long startTimeBeforeFlushInMillis; @Override public void run() { + startTimeBeforeFlushInMillis = System.currentTimeMillis(); while (!Thread.currentThread().isInterrupted()) { _sleepForPollPeriod(); if (!Thread.currentThread().isInterrupted()) { try { - _checkOldGenUsage(); + _checkBloomFilterUsage(); + + // flush out bloom filter every K hours + _flushBloomFilter(); } catch (Exception ex) { - _logger.warn("Exception occurred while checking old generation usage.", ex); + _logger.warn("Exception occurred while checking bloom filter usage.", ex); } } } } - private void _checkOldGenUsage() { - List memoryPoolBeans = ManagementFactory.getMemoryPoolMXBeans(); - for (MemoryPoolMXBean bean : memoryPoolBeans) { - if (bean.getType() == MemoryType.HEAP) { - String name = bean.getName().toLowerCase(); - if (name.contains("old") || name.contains("tenured")) { - long oldGenUsed = bean.getUsage().getUsed(); - _logger.info("Old Gen Memory = {} bytes", oldGenUsed); - _logger.info("Max JVM Memory = {} bytes", MAX_MEMORY); - if (oldGenUsed > 0.90 * MAX_MEMORY) { - _logger.info("JVM heap memory usage has exceeded 90% of the allocated heap memory. Disabling writes to TRIE."); - _writesToTrieEnabled = false; - } else if(oldGenUsed < 0.50 * MAX_MEMORY && !_writesToTrieEnabled) { - _logger.info("JVM heap memory usage is below 50% of the allocated heap memory and writes to TRIE is disabled. " - + "Enabling writes to TRIE now."); - _writesToTrieEnabled = true; - } - } - } + private void _checkBloomFilterUsage() { + _logger.info("Bloom approx no. elements = {}", BLOOMFILTER.approximateElementCount()); + _logger.info("Bloom expected error rate = {}", BLOOMFILTER.expectedFpp()); + } + + + private void _flushBloomFilter() { + long currentTimeInMillis = System.currentTimeMillis(); + if((currentTimeInMillis - startTimeBeforeFlushInMillis) > BLOOM_FILTER_FLUSH_INTERVAL){ + _logger.info("Flushing out bloom filter entries"); + BLOOMFILTER = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), bloomFilterExpectedNumberInsertions , bloomFilterErrorRate); + startTimeBeforeFlushInMillis = currentTimeInMillis; } } private void _sleepForPollPeriod() { try { - _logger.info("Sleeping for {}s before checking old gen usage.", POLL_INTERVAL_MS / 1000); + _logger.info("Sleeping for {}s before checking bloom filter statistics.", POLL_INTERVAL_MS / 1000); Thread.sleep(POLL_INTERVAL_MS); } catch (InterruptedException ex) { _logger.warn("AbstractSchemaService memory monitor thread was interrupted."); From 12d6688e3982f9edc53af40e0ba924fd3e7f1d08 Mon Sep 17 00:00:00 2001 From: Dilip Date: Tue, 22 May 2018 14:31:15 -0700 Subject: [PATCH 07/19] 1)Remove TRIE based code 2)On ES failures we do not remove inserted keys from BloomFilter since this operation is unsupported --- .../service/schema/AbstractSchemaService.java | 62 +++++-------------- .../schema/ElasticSearchSchemaService.java | 15 ----- 2 files changed, 14 insertions(+), 63 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index d9cf679f1..0b28945dc 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -12,10 +12,7 @@ import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; -import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree; -import com.googlecode.concurrenttrees.radix.RadixTree; -import com.googlecode.concurrenttrees.radix.node.concrete.SmartArrayBasedNodeFactory; -import com.googlecode.concurrenttrees.radix.node.concrete.voidvalue.VoidValue; + import com.salesforce.dva.argus.entity.KeywordQuery; import com.salesforce.dva.argus.entity.Metric; import com.salesforce.dva.argus.entity.MetricSchemaRecord; @@ -29,13 +26,11 @@ public abstract class AbstractSchemaService extends DefaultService implements Sc private static final long POLL_INTERVAL_MS = 60 * 1000L; private static final long BLOOM_FILTER_FLUSH_INTERVAL = 6 * 60 * 60 * 1000L; - protected static final RadixTree TRIE = new ConcurrentRadixTree<>(new SmartArrayBasedNodeFactory()); protected static BloomFilter BLOOMFILTER; + private static boolean _writesToBloomFilterEnabled = true; + private int bloomFilterExpectedNumberInsertions; private double bloomFilterErrorRate; - - private static boolean _writesToTrieEnabled = true; - private final Logger _logger = LoggerFactory.getLogger(getClass()); private final Thread _bloomFilterMonitorThread; private final boolean _cacheEnabled; @@ -80,35 +75,29 @@ public void put(List metrics) { return; } - //If cache is enabled, create a list of metricsToPut that do not exist on the TRIE and then call implementation + //If cache is enabled, create a list of metricsToPut that do not exist on the BLOOMFILTER and then call implementation // specific put with only those subset of metricsToPut. List metricsToPut = new ArrayList<>(metrics.size()); for(Metric metric : metrics) { if(metric.getTags().isEmpty()) { - String key = constructTrieKey(metric, null); - // boolean found = TRIE.getValueForExactKey(key) != null; + String key = constructKey(metric, null); boolean found = BLOOMFILTER.mightContain(key); - // _logger.info("Bloom approx elements = {}", bloomFilter.approximateElementCount()); if(!found) { metricsToPut.add(metric); - if(_writesToTrieEnabled) { + if(_writesToBloomFilterEnabled) { BLOOMFILTER.put(key); - // TRIE.putIfAbsent(key, VoidValue.SINGLETON); } } } else { boolean newTags = false; for(Entry tagEntry : metric.getTags().entrySet()) { - String key = constructTrieKey(metric, tagEntry); - // boolean found = TRIE.getValueForExactKey(key) != null; + String key = constructKey(metric, tagEntry); boolean found = BLOOMFILTER.mightContain(key); - // _logger.info("Bloom approx elements = {}", bloomFilter.approximateElementCount()); if(!found) { newTags = true; - if(_writesToTrieEnabled) { + if(_writesToBloomFilterEnabled) { BLOOMFILTER.put(key); - //TRIE.putIfAbsent(key, VoidValue.SINGLETON); } } } @@ -122,10 +111,9 @@ public void put(List metrics) { implementationSpecificPut(metricsToPut); } - protected abstract void implementationSpecificPut(List metrics); - protected String constructTrieKey(Metric metric, Entry tagEntry) { + protected String constructKey(Metric metric, Entry tagEntry) { StringBuilder sb = new StringBuilder(metric.getScope()); sb.append('\0').append(metric.getMetric()); @@ -140,42 +128,22 @@ protected String constructTrieKey(Metric metric, Entry tagEntry) return sb.toString(); } - protected String constructTrieKey(String scope, String metric, String tagk, String tagv, String namespace) { - StringBuilder sb = new StringBuilder(scope); - sb.append('\0').append(metric); - - if(namespace != null) { - sb.append('\0').append(namespace); - } - - if(tagk != null) { - sb.append('\0').append(tagk); - } - - if(tagv != null) { - sb.append('\0').append(tagv); - } - - return sb.toString(); - } - - @Override public void dispose() { requireNotDisposed(); if (_bloomFilterMonitorThread != null && _bloomFilterMonitorThread.isAlive()) { - _logger.info("Stopping old gen monitor thread."); + _logger.info("Stopping bloom filter monitor thread."); _bloomFilterMonitorThread.interrupt(); - _logger.info("Old gen monitor thread interrupted."); + _logger.info("Bloom filter monitor thread interrupted."); try { - _logger.info("Waiting for old gen monitor thread to terminate."); + _logger.info("Waiting for bloom filter monitor thread to terminate."); _bloomFilterMonitorThread.join(); } catch (InterruptedException ex) { - _logger.warn("Old gen monitor thread was interrupted while shutting down."); + _logger.warn("Bloom filter monitor thread was interrupted while shutting down."); } _logger.info("System monitoring stopped."); } else { - _logger.info("Requested shutdown of old gen monitor thread aborted, as it is not yet running."); + _logger.info("Requested shutdown of bloom filter monitor thread aborted, as it is not yet running."); } } @@ -269,7 +237,6 @@ private void _checkBloomFilterUsage() { _logger.info("Bloom expected error rate = {}", BLOOMFILTER.expectedFpp()); } - private void _flushBloomFilter() { long currentTimeInMillis = System.currentTimeMillis(); if((currentTimeInMillis - startTimeBeforeFlushInMillis) > BLOOM_FILTER_FLUSH_INTERVAL){ @@ -289,5 +256,4 @@ private void _sleepForPollPeriod() { } } } - } diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java index 3f0a1f650..b7e6924fc 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java @@ -572,7 +572,6 @@ private void _upsert(List records) { strResponse = extractResponse(response); } catch (IOException e) { //TODO: Retry with exponential back-off for handling EsRejectedExecutionException/RemoteTransportException/TimeoutException?? - _removeFromTrie(records); throw new SystemException(e); } @@ -592,10 +591,8 @@ private void _upsert(List records) { recordsToRemove.add(msrList.getRecord(item.index._id)); } } - _removeFromTrie(recordsToRemove); } } catch(IOException e) { - _removeFromTrie(records); throw new SystemException("Failed to parse reponse of put metrics. The response was: " + strResponse, e); } } @@ -616,7 +613,6 @@ private void _upsertAsync(List records) { String requestBody = _mapper.writeValueAsString(msrList); entity = new StringEntity(requestBody); } catch (JsonProcessingException | UnsupportedEncodingException e) { - _removeFromTrie(records); throw new SystemException("Failed to parse metrics to schema records when indexing.", e); } @@ -641,10 +637,8 @@ public void onSuccess(Response response) { recordsToRemove.add(msrList.getRecord(item.index._id)); } } - _removeFromTrie(recordsToRemove); } } catch(IOException e) { - _removeFromTrie(records); _logger.warn("Failed to parse reponse of put metrics. The response was: " + strResponse, e); } } @@ -652,7 +646,6 @@ public void onSuccess(Response response) { @Override public void onFailure(Exception e) { //TODO: Retry with exponential back-off for handling EsRejectedExecutionException/RemoteTransportException/TimeoutException?? - _removeFromTrie(records); _logger.warn("Failed to execute the indexing request.", e); } }; @@ -660,14 +653,6 @@ public void onFailure(Exception e) { _esRestClient.performRequestAsync(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), entity, responseListener); } - private void _removeFromTrie(List records) { - _logger.info("Removing {} records from trie.", records.size()); - for(MetricSchemaRecord record : records) { - String key = constructTrieKey(record.getScope(), record.getMetric(), record.getTagKey(), record.getTagValue(), record.getNamespace()); - TRIE.remove(key); - } - } - private String _constructTermAggregationQuery(MetricSchemaRecordQuery query, RecordType type) { ObjectMapper mapper = new ObjectMapper(); ObjectNode queryNode = _constructQueryNode(query, mapper); From 533358b36f5f10a863f37ed899897a54e183d34e Mon Sep 17 00:00:00 2001 From: Dilip Date: Wed, 23 May 2018 12:33:21 -0700 Subject: [PATCH 08/19] Add random interval hour flush of bloom filter to prevent thundering herd problem --- .../service/schema/AbstractSchemaService.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index 0b28945dc..d88d9293d 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -24,8 +24,10 @@ public abstract class AbstractSchemaService extends DefaultService implements SchemaService { - private static final long POLL_INTERVAL_MS = 60 * 1000L; - private static final long BLOOM_FILTER_FLUSH_INTERVAL = 6 * 60 * 60 * 1000L; + private static final long POLL_INTERVAL_MS = 10 * 60 * 1000L; + private static final long BLOOM_FILTER_FLUSH_INTERVAL_HOUR = 60 * 60 * 1000L; + private static final int MIN_FLUSH_HOUR_INTERVAL = 6; + private static final int MAX_FLUSH_HOUR_INTERVAL = 15; protected static BloomFilter BLOOMFILTER; private static boolean _writesToBloomFilterEnabled = true; @@ -213,10 +215,13 @@ public String getDefaultValue() { private class BloomFilterMonitorThread implements Runnable { long startTimeBeforeFlushInMillis; + int flushAtHour; @Override public void run() { startTimeBeforeFlushInMillis = System.currentTimeMillis(); + flushAtHour = getRandomHourBetweenRange(MIN_FLUSH_HOUR_INTERVAL, MAX_FLUSH_HOUR_INTERVAL); + _logger.info("Initialized bloom filter flushing out, after {} hours", flushAtHour); while (!Thread.currentThread().isInterrupted()) { _sleepForPollPeriod(); if (!Thread.currentThread().isInterrupted()) { @@ -239,13 +244,18 @@ private void _checkBloomFilterUsage() { private void _flushBloomFilter() { long currentTimeInMillis = System.currentTimeMillis(); - if((currentTimeInMillis - startTimeBeforeFlushInMillis) > BLOOM_FILTER_FLUSH_INTERVAL){ - _logger.info("Flushing out bloom filter entries"); + if((currentTimeInMillis - startTimeBeforeFlushInMillis) > BLOOM_FILTER_FLUSH_INTERVAL_HOUR * flushAtHour){ + _logger.info("Flushing out bloom filter entries, after {} hours", flushAtHour); BLOOMFILTER = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), bloomFilterExpectedNumberInsertions , bloomFilterErrorRate); + flushAtHour = getRandomHourBetweenRange(MIN_FLUSH_HOUR_INTERVAL, MAX_FLUSH_HOUR_INTERVAL); startTimeBeforeFlushInMillis = currentTimeInMillis; } } + private int getRandomHourBetweenRange(int minHour, int maxHour){ + return (int) (Math.random() * (maxHour - minHour +1) + minHour); + } + private void _sleepForPollPeriod() { try { _logger.info("Sleeping for {}s before checking bloom filter statistics.", POLL_INTERVAL_MS / 1000); From e37fc47fc8d101024d41e1072ec81339be1bc0c0 Mon Sep 17 00:00:00 2001 From: Dilip Date: Fri, 25 May 2018 18:40:40 -0700 Subject: [PATCH 09/19] Add random number while looking up key in bloom filter --- .../service/schema/AbstractSchemaService.java | 41 ++++++++++++------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index d88d9293d..ccf90be5f 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.Map.Entry; import java.util.Properties; +import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,12 +31,15 @@ public abstract class AbstractSchemaService extends DefaultService implements Sc private static final int MAX_FLUSH_HOUR_INTERVAL = 15; protected static BloomFilter BLOOMFILTER; private static boolean _writesToBloomFilterEnabled = true; + private static Random rand = new Random(); + private static int randomNumber = rand.nextInt(); private int bloomFilterExpectedNumberInsertions; private double bloomFilterErrorRate; private final Logger _logger = LoggerFactory.getLogger(getClass()); private final Thread _bloomFilterMonitorThread; private final boolean _cacheEnabled; + protected final boolean _syncPut; protected AbstractSchemaService(SystemConfiguration config) { @@ -115,21 +119,6 @@ public void put(List metrics) { protected abstract void implementationSpecificPut(List metrics); - protected String constructKey(Metric metric, Entry tagEntry) { - StringBuilder sb = new StringBuilder(metric.getScope()); - sb.append('\0').append(metric.getMetric()); - - if(metric.getNamespace() != null) { - sb.append('\0').append(metric.getNamespace()); - } - - if(tagEntry != null) { - sb.append('\0').append(tagEntry.getKey()).append('\0').append(tagEntry.getValue()); - } - - return sb.toString(); - } - @Override public void dispose() { requireNotDisposed(); @@ -161,6 +150,24 @@ public void dispose() { @Override public abstract List keywordSearch(KeywordQuery query); + private String constructKey(Metric metric, Entry tagEntry) { + StringBuilder sb = new StringBuilder(metric.getScope()); + sb.append('\0').append(metric.getMetric()); + + if(metric.getNamespace() != null) { + sb.append('\0').append(metric.getNamespace()); + } + + if(tagEntry != null) { + sb.append('\0').append(tagEntry.getKey()).append('\0').append(tagEntry.getValue()); + } + + // Add randomness for each instance of bloom filter running on different + // schema clients to reduce probability of false positives that metric schemas are not written to ES + sb.append('\0').append(randomNumber); + + return sb.toString(); + } /** * The set of implementation specific configuration properties. @@ -222,6 +229,7 @@ public void run() { startTimeBeforeFlushInMillis = System.currentTimeMillis(); flushAtHour = getRandomHourBetweenRange(MIN_FLUSH_HOUR_INTERVAL, MAX_FLUSH_HOUR_INTERVAL); _logger.info("Initialized bloom filter flushing out, after {} hours", flushAtHour); + _logger.info("Initialized random number for bloom filter key = {}", randomNumber); while (!Thread.currentThread().isInterrupted()) { _sleepForPollPeriod(); if (!Thread.currentThread().isInterrupted()) { @@ -247,7 +255,10 @@ private void _flushBloomFilter() { if((currentTimeInMillis - startTimeBeforeFlushInMillis) > BLOOM_FILTER_FLUSH_INTERVAL_HOUR * flushAtHour){ _logger.info("Flushing out bloom filter entries, after {} hours", flushAtHour); BLOOMFILTER = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), bloomFilterExpectedNumberInsertions , bloomFilterErrorRate); + randomNumber = rand.nextInt(); flushAtHour = getRandomHourBetweenRange(MIN_FLUSH_HOUR_INTERVAL, MAX_FLUSH_HOUR_INTERVAL); + _logger.info("New random number for bloom filter key = {}", randomNumber); + _logger.info("New flush interva = {} hours", flushAtHour); startTimeBeforeFlushInMillis = currentTimeInMillis; } } From ebebf26c4378058f5e9b0a34c323d4f402c5cc2e Mon Sep 17 00:00:00 2001 From: Dilip Date: Fri, 1 Jun 2018 19:05:05 -0700 Subject: [PATCH 10/19] Remove code for removal of records on ES failure, since we will only insert to BloomFilter if ES write is successful --- .../argus/service/schema/ElasticSearchSchemaService.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java index b7e6924fc..2a088315d 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java @@ -570,6 +570,8 @@ private void _upsert(List records) { String requestBody = _mapper.writeValueAsString(msrList); Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(requestBody)); strResponse = extractResponse(response); + + //add to bloom filter } catch (IOException e) { //TODO: Retry with exponential back-off for handling EsRejectedExecutionException/RemoteTransportException/TimeoutException?? throw new SystemException(e); @@ -579,16 +581,13 @@ private void _upsert(List records) { PutResponse putResponse = new ObjectMapper().readValue(strResponse, PutResponse.class); //TODO: If response contains HTTP 429 Too Many Requests (EsRejectedExecutionException), then retry with exponential back-off. if(putResponse.errors) { - List recordsToRemove = new ArrayList<>(); for(Item item : putResponse.items) { if(item.create != null && item.create.status != HttpStatus.SC_CONFLICT && item.create.status != HttpStatus.SC_CREATED) { _logger.warn("Failed to index metric. Reason: " + new ObjectMapper().writeValueAsString(item.create.error)); - recordsToRemove.add(msrList.getRecord(item.create._id)); } if(item.index != null && item.index.status == HttpStatus.SC_NOT_FOUND) { _logger.warn("Index does not exist. Error: " + new ObjectMapper().writeValueAsString(item.index.error)); - recordsToRemove.add(msrList.getRecord(item.index._id)); } } } @@ -625,16 +624,13 @@ public void onSuccess(Response response) { PutResponse putResponse = new ObjectMapper().readValue(strResponse, PutResponse.class); //TODO: If response contains HTTP 429 Too Many Requests (EsRejectedExecutionException), then retry with exponential back-off. if(putResponse.errors) { - List recordsToRemove = new ArrayList<>(); for(Item item : putResponse.items) { if(item.create != null && item.create.status != HttpStatus.SC_CONFLICT && item.create.status != HttpStatus.SC_CREATED) { _logger.warn("Failed to index metric. Reason: " + new ObjectMapper().writeValueAsString(item.create.error)); - recordsToRemove.add(msrList.getRecord(item.create._id)); } if(item.index != null && item.index.status == HttpStatus.SC_NOT_FOUND) { _logger.warn("Index does not exist. Error: " + new ObjectMapper().writeValueAsString(item.index.error)); - recordsToRemove.add(msrList.getRecord(item.index._id)); } } } From b491ac44881c7edccee333b52a8dd58538a490cd Mon Sep 17 00:00:00 2001 From: Dilip Date: Fri, 1 Jun 2018 20:08:25 -0700 Subject: [PATCH 11/19] Add to BloomFilter only records that were successfuly written to ES --- .../service/schema/AbstractSchemaService.java | 32 +- .../schema/ElasticSearchSchemaService.java | 920 +++++++++--------- 2 files changed, 494 insertions(+), 458 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index ccf90be5f..a7a9f3a4f 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -91,9 +91,6 @@ public void put(List metrics) { boolean found = BLOOMFILTER.mightContain(key); if(!found) { metricsToPut.add(metric); - if(_writesToBloomFilterEnabled) { - BLOOMFILTER.put(key); - } } } else { boolean newTags = false; @@ -102,9 +99,6 @@ public void put(List metrics) { boolean found = BLOOMFILTER.mightContain(key); if(!found) { newTags = true; - if(_writesToBloomFilterEnabled) { - BLOOMFILTER.put(key); - } } } @@ -150,7 +144,7 @@ public void dispose() { @Override public abstract List keywordSearch(KeywordQuery query); - private String constructKey(Metric metric, Entry tagEntry) { + protected String constructKey(Metric metric, Entry tagEntry) { StringBuilder sb = new StringBuilder(metric.getScope()); sb.append('\0').append(metric.getMetric()); @@ -168,7 +162,29 @@ private String constructKey(Metric metric, Entry tagEntry) { return sb.toString(); } - + + protected String constructKey(String scope, String metric, String tagk, String tagv, String namespace) { + StringBuilder sb = new StringBuilder(scope); + sb.append('\0').append(metric); + + if(namespace != null) { + sb.append('\0').append(namespace); + } + + if(tagk != null) { + sb.append('\0').append(tagk); + } + + if(tagv != null) { + sb.append('\0').append(tagv); + } + + // Add randomness for each instance of bloom filter running on different + // schema clients to reduce probability of false positives that metric schemas are not written to ES + sb.append('\0').append(randomNumber); + + return sb.toString(); + } /** * The set of implementation specific configuration properties. * diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java index 2a088315d..df73d3833 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java @@ -65,7 +65,7 @@ */ @Singleton public class ElasticSearchSchemaService extends AbstractSchemaService { - + private static final String INDEX_NAME = "metadata_index"; private static final String TYPE_NAME = "metadata_type"; private static final String KEEP_SCROLL_CONTEXT_OPEN_FOR = "1m"; @@ -73,23 +73,23 @@ public class ElasticSearchSchemaService extends AbstractSchemaService { private static final int MAX_RETRY_TIMEOUT = 300 * 1000; private static final String FIELD_TYPE_TEXT = "text"; private static final String FIELD_TYPE_DATE ="date"; - + private final ObjectMapper _mapper; - private Logger _logger = LoggerFactory.getLogger(getClass()); - private final MonitorService _monitorService; - private final RestClient _esRestClient; - private final int _replicationFactor; + private Logger _logger = LoggerFactory.getLogger(getClass()); + private final MonitorService _monitorService; + private final RestClient _esRestClient; + private final int _replicationFactor; private final int _numShards; private final int _bulkIndexingSize; private HashAlgorithm _idgenHashAlgo; - - @Inject + + @Inject public ElasticSearchSchemaService(SystemConfiguration config, MonitorService monitorService) { super(config); - + _monitorService = monitorService; _mapper = _createObjectMapper(); - + String algorithm = config.getValue(Property.ELASTICSEARCH_IDGEN_HASH_ALGO.getName(), Property.ELASTICSEARCH_IDGEN_HASH_ALGO.getDefaultValue()); try { _idgenHashAlgo = HashAlgorithm.fromString(algorithm); @@ -97,21 +97,21 @@ public ElasticSearchSchemaService(SystemConfiguration config, MonitorService mon _logger.warn("{} is not supported by this service. Valid values are: {}.", algorithm, Arrays.asList(HashAlgorithm.values())); _idgenHashAlgo = HashAlgorithm.MD5; } - + _logger.info("Using {} for Elasticsearch document id generation.", _idgenHashAlgo); - + _replicationFactor = Integer.parseInt( config.getValue(Property.ELASTICSEARCH_NUM_REPLICAS.getName(), Property.ELASTICSEARCH_NUM_REPLICAS.getDefaultValue())); - + _numShards = Integer.parseInt( config.getValue(Property.ELASTICSEARCH_SHARDS_COUNT.getName(), Property.ELASTICSEARCH_SHARDS_COUNT.getDefaultValue())); - + _bulkIndexingSize = Integer.parseInt( config.getValue(Property.ELASTICSEARCH_INDEXING_BATCH_SIZE.getName(), Property.ELASTICSEARCH_INDEXING_BATCH_SIZE.getDefaultValue())); - + String[] nodes = config.getValue(Property.ELASTICSEARCH_ENDPOINT.getName(), Property.ELASTICSEARCH_ENDPOINT.getDefaultValue()).split(","); HttpHost[] httpHosts = new HttpHost[nodes.length]; - + for(int i=0; i metrics) { SystemAssert.requireArgument(metrics != null, "Metrics list cannot be null."); - + _logger.info("{} new metrics need to be indexed on ES.", metrics.size()); - + long start = System.currentTimeMillis(); List> fracturedList = _fracture(metrics); - + for(List records : fracturedList) { if(!records.isEmpty()) { if(_syncPut) { @@ -205,22 +205,22 @@ protected void implementationSpecificPut(List metrics) { } } } - + int count = 0; for(List records : fracturedList) { count += records.size(); } - + _monitorService.modifyCounter(MonitorService.Counter.SCHEMARECORDS_WRITTEN, count, null); _monitorService.modifyCounter(MonitorService.Counter.SCHEMARECORDS_WRITE_LATENCY, (System.currentTimeMillis() - start), null); } - + /* Convert the given list of metrics to a list of metric schema records. At the same time, fracture the records list * if its size is greater than INDEXING_BATCH_SIZE. */ private List> _fracture(List metrics) { List> fracturedList = new ArrayList<>(); - + List records = new ArrayList<>(_bulkIndexingSize); for(Metric metric : metrics) { if(metric.getTags().isEmpty()) { @@ -233,41 +233,41 @@ private List> _fracture(List metrics) { } continue; } - + for(Map.Entry entry : metric.getTags().entrySet()) { records.add(new MetricSchemaRecord(metric.getNamespace(), metric.getScope(), metric.getMetric(), - entry.getKey(), entry.getValue())); + entry.getKey(), entry.getValue())); if(records.size() == _bulkIndexingSize) { fracturedList.add(records); records = new ArrayList<>(_bulkIndexingSize); } } } - + fracturedList.add(records); return fracturedList; } - + @Override public List get(MetricSchemaRecordQuery query) { requireNotDisposed(); - SystemAssert.requireArgument(query != null, "MetricSchemaRecordQuery cannot be null."); - long size = (long) query.getLimit() * query.getPage(); - SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, - "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE"); - - - Map tags = new HashMap<>(); - tags.put("type", "REGEXP_WITHOUT_AGGREGATION"); - long start = System.currentTimeMillis(); - boolean scroll = false; + SystemAssert.requireArgument(query != null, "MetricSchemaRecordQuery cannot be null."); + long size = (long) query.getLimit() * query.getPage(); + SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, + "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE"); + + + Map tags = new HashMap<>(); + tags.put("type", "REGEXP_WITHOUT_AGGREGATION"); + long start = System.currentTimeMillis(); + boolean scroll = false; StringBuilder sb = new StringBuilder().append("/") - .append(INDEX_NAME) - .append("/") - .append(TYPE_NAME) - .append("/") - .append("_search"); - + .append(INDEX_NAME) + .append("/") + .append(TYPE_NAME) + .append("/") + .append("_search"); + int from = 0, scrollSize; if(query.getLimit() * query.getPage() > 10000) { sb.append("?scroll=").append(KEEP_SCROLL_CONTEXT_OPEN_FOR); @@ -278,94 +278,94 @@ public List get(MetricSchemaRecordQuery query) { from = query.getLimit() * (query.getPage() - 1); scrollSize = query.getLimit(); } - + String requestUrl = sb.toString(); String queryJson = _constructTermQuery(query, from, scrollSize); - + try { Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(queryJson)); - + MetricSchemaRecordList list = toEntity(extractResponse(response), new TypeReference() {}); - + if(scroll) { requestUrl = new StringBuilder().append("/").append("_search").append("/").append("scroll").toString(); List records = new LinkedList<>(list.getRecords()); - + while(true) { String scrollID = list.getScrollID(); - + Map requestBody = new HashMap<>(); requestBody.put("scroll_id", scrollID); requestBody.put("scroll", KEEP_SCROLL_CONTEXT_OPEN_FOR); - + response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(new ObjectMapper().writeValueAsString(requestBody))); - + list = toEntity(extractResponse(response), new TypeReference() {}); records.addAll(list.getRecords()); - + if(records.size() >= query.getLimit() * query.getPage() || list.getRecords().size() < scrollSize) { break; } } - + int fromIndex = query.getLimit() * (query.getPage() - 1); if(records.size() <= fromIndex) { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return Collections.emptyList(); } - + _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return records.subList(fromIndex, records.size()); - + } else { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return list.getRecords(); } - + } catch (UnsupportedEncodingException | JsonProcessingException e) { throw new SystemException("Search failed.", e); } catch (IOException e) { throw new SystemException("IOException when trying to perform ES request.", e); } } - + @Override public List getUnique(MetricSchemaRecordQuery query, RecordType type) { requireNotDisposed(); SystemAssert.requireArgument(query != null, "MetricSchemaRecordQuery cannot be null."); long size = (long) query.getLimit() * query.getPage(); - SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, - "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE"); - - - Map tags = new HashMap<>(); - tags.put("type", "REGEXP_WITH_AGGREGATION"); - long start = System.currentTimeMillis(); + SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, + "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE"); + + + Map tags = new HashMap<>(); + tags.put("type", "REGEXP_WITH_AGGREGATION"); + long start = System.currentTimeMillis(); String requestUrl = new StringBuilder().append("/") - .append(INDEX_NAME) - .append("/") - .append(TYPE_NAME) - .append("/") - .append("_search") - .toString(); - + .append(INDEX_NAME) + .append("/") + .append(TYPE_NAME) + .append("/") + .append("_search") + .toString(); + String queryJson = _constructTermAggregationQuery(query, type); try { Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(queryJson)); String str = extractResponse(response); List records = SchemaService.constructMetricSchemaRecordsForType( - toEntity(str, new TypeReference>() {}), type); - + toEntity(str, new TypeReference>() {}), type); + int fromIndex = query.getLimit() * (query.getPage() - 1); if(records.size() <= fromIndex) { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return Collections.emptyList(); } - + if(records.size() < query.getLimit() * query.getPage()) { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); @@ -379,31 +379,31 @@ public List getUnique(MetricSchemaRecordQuery query, RecordT throw new SystemException(e); } } - + @Override public List keywordSearch(KeywordQuery kq) { requireNotDisposed(); - SystemAssert.requireArgument(kq != null, "Query cannot be null."); - SystemAssert.requireArgument(kq.getQuery() != null || kq.getType() != null, "Either the query string or the type must not be null."); - - long size = (long) kq.getLimit() * kq.getPage(); - SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, - "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE"); - - - Map tags = new HashMap<>(); - tags.put("type", "FTS_WITH_AGGREGATION"); - long start = System.currentTimeMillis(); + SystemAssert.requireArgument(kq != null, "Query cannot be null."); + SystemAssert.requireArgument(kq.getQuery() != null || kq.getType() != null, "Either the query string or the type must not be null."); + + long size = (long) kq.getLimit() * kq.getPage(); + SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, + "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE"); + + + Map tags = new HashMap<>(); + tags.put("type", "FTS_WITH_AGGREGATION"); + long start = System.currentTimeMillis(); StringBuilder sb = new StringBuilder().append("/") - .append(INDEX_NAME) - .append("/") - .append(TYPE_NAME) - .append("/") - .append("_search"); + .append(INDEX_NAME) + .append("/") + .append(TYPE_NAME) + .append("/") + .append("_search"); try { - + if(kq.getQuery() != null) { - + int from = 0, scrollSize = 0; boolean scroll = false;; if(kq.getLimit() * kq.getPage() > 10000) { @@ -415,82 +415,82 @@ public List keywordSearch(KeywordQuery kq) { from = kq.getLimit() * (kq.getPage() - 1); scrollSize = kq.getLimit(); } - + List tokens = _analyzedTokens(kq.getQuery()); String queryJson = _constructQueryStringQuery(tokens, from, scrollSize); String requestUrl = sb.toString(); - + Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(queryJson)); String strResponse = extractResponse(response); MetricSchemaRecordList list = toEntity(strResponse, new TypeReference() {}); - + if(scroll) { requestUrl = new StringBuilder().append("/").append("_search").append("/").append("scroll").toString(); List records = new LinkedList<>(list.getRecords()); - + while(true) { Map requestBody = new HashMap<>(); requestBody.put("scroll_id", list.getScrollID()); requestBody.put("scroll", KEEP_SCROLL_CONTEXT_OPEN_FOR); - + response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(new ObjectMapper().writeValueAsString(requestBody))); - + list = toEntity(extractResponse(response), new TypeReference() {}); - + records.addAll(list.getRecords()); - + if(records.size() >= kq.getLimit() * kq.getPage() || list.getRecords().size() < scrollSize) { break; } } - + int fromIndex = kq.getLimit() * (kq.getPage() - 1); if(records.size() <= fromIndex) { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return Collections.emptyList(); } - + _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return records.subList(fromIndex, records.size()); - + } else { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return list.getRecords(); } - - + + } else { Map> tokensMap = new HashMap<>(); - + List tokens = _analyzedTokens(kq.getScope()); if(!tokens.isEmpty()) { tokensMap.put(RecordType.SCOPE, tokens); } - + tokens = _analyzedTokens(kq.getMetric()); if(!tokens.isEmpty()) { tokensMap.put(RecordType.METRIC, tokens); } - + tokens = _analyzedTokens(kq.getTagKey()); if(!tokens.isEmpty()) { tokensMap.put(RecordType.TAGK, tokens); } - + tokens = _analyzedTokens(kq.getTagValue()); if(!tokens.isEmpty()) { tokensMap.put(RecordType.TAGV, tokens); } - + tokens = _analyzedTokens(kq.getNamespace()); if(!tokens.isEmpty()) { tokensMap.put(RecordType.NAMESPACE, tokens); } - + String queryJson = _constructQueryStringQuery(kq, tokensMap); String requestUrl = sb.toString(); Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(queryJson)); @@ -498,14 +498,14 @@ public List keywordSearch(KeywordQuery kq) { List records = SchemaService.constructMetricSchemaRecordsForType( toEntity(strResponse, new TypeReference>() {}), kq.getType()); - + int fromIndex = kq.getLimit() * (kq.getPage() - 1); if(records.size() <= fromIndex) { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return Collections.emptyList(); } - + if(records.size() < kq.getLimit() * kq.getPage()) { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags); _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); @@ -515,27 +515,27 @@ public List keywordSearch(KeywordQuery kq) { _monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags); return records.subList(fromIndex, kq.getLimit() * kq.getPage()); } - + } - + } catch (IOException e) { throw new SystemException(e); } } - - + + private List _analyzedTokens(String query) { - + if(!SchemaService.containsFilter(query)) { return Collections.emptyList(); } - + List tokens = new ArrayList<>(); - + String requestUrl = new StringBuilder("/").append(INDEX_NAME).append("/_analyze").toString(); - + String requestBody = "{\"analyzer\" : \"metadata_analyzer\", \"text\": \"" + query + "\" }"; - + try { Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(requestBody)); String strResponse = extractResponse(response); @@ -545,7 +545,7 @@ private List _analyzedTokens(String query) { tokens.add(tokenNode.get("token").asText()); } } - + return tokens; } catch (IOException e) { throw new SystemException(e); @@ -554,58 +554,63 @@ private List _analyzedTokens(String query) { private void _upsert(List records) { - + String requestUrl = new StringBuilder().append("/") - .append(INDEX_NAME) - .append("/") - .append(TYPE_NAME) - .append("/") - .append("_bulk") - .toString(); - + .append(INDEX_NAME) + .append("/") + .append(TYPE_NAME) + .append("/") + .append("_bulk") + .toString(); + String strResponse = ""; - + MetricSchemaRecordList msrList = new MetricSchemaRecordList(records, _idgenHashAlgo); try { String requestBody = _mapper.writeValueAsString(msrList); Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(requestBody)); strResponse = extractResponse(response); - - //add to bloom filter } catch (IOException e) { //TODO: Retry with exponential back-off for handling EsRejectedExecutionException/RemoteTransportException/TimeoutException?? throw new SystemException(e); } - + try { PutResponse putResponse = new ObjectMapper().readValue(strResponse, PutResponse.class); //TODO: If response contains HTTP 429 Too Many Requests (EsRejectedExecutionException), then retry with exponential back-off. if(putResponse.errors) { + List recordsToRemove = new ArrayList<>(); for(Item item : putResponse.items) { if(item.create != null && item.create.status != HttpStatus.SC_CONFLICT && item.create.status != HttpStatus.SC_CREATED) { _logger.warn("Failed to index metric. Reason: " + new ObjectMapper().writeValueAsString(item.create.error)); + recordsToRemove.add(msrList.getRecord(item.create._id)); } - + if(item.index != null && item.index.status == HttpStatus.SC_NOT_FOUND) { _logger.warn("Index does not exist. Error: " + new ObjectMapper().writeValueAsString(item.index.error)); + recordsToRemove.add(msrList.getRecord(item.index._id)); } } + _logger.info("{} records were not written to ES", recordsToRemove.size()); + records.removeAll(recordsToRemove); } + //add to bloom filter + _addToBloomFilter(records); } catch(IOException e) { throw new SystemException("Failed to parse reponse of put metrics. The response was: " + strResponse, e); } } - + private void _upsertAsync(List records) { - + String requestUrl = new StringBuilder().append("/") - .append(INDEX_NAME) - .append("/") - .append(TYPE_NAME) - .append("/") - .append("_bulk") - .toString(); - + .append(INDEX_NAME) + .append("/") + .append(TYPE_NAME) + .append("/") + .append("_bulk") + .toString(); + MetricSchemaRecordList msrList = new MetricSchemaRecordList(records, _idgenHashAlgo); StringEntity entity; try { @@ -614,9 +619,9 @@ private void _upsertAsync(List records) { } catch (JsonProcessingException | UnsupportedEncodingException e) { throw new SystemException("Failed to parse metrics to schema records when indexing.", e); } - + ResponseListener responseListener = new ResponseListener() { - + @Override public void onSuccess(Response response) { String strResponse = extractResponse(response); @@ -624,76 +629,91 @@ public void onSuccess(Response response) { PutResponse putResponse = new ObjectMapper().readValue(strResponse, PutResponse.class); //TODO: If response contains HTTP 429 Too Many Requests (EsRejectedExecutionException), then retry with exponential back-off. if(putResponse.errors) { + List recordsToRemove = new ArrayList<>(); for(Item item : putResponse.items) { if(item.create != null && item.create.status != HttpStatus.SC_CONFLICT && item.create.status != HttpStatus.SC_CREATED) { _logger.warn("Failed to index metric. Reason: " + new ObjectMapper().writeValueAsString(item.create.error)); + recordsToRemove.add(msrList.getRecord(item.create._id)); } - + if(item.index != null && item.index.status == HttpStatus.SC_NOT_FOUND) { _logger.warn("Index does not exist. Error: " + new ObjectMapper().writeValueAsString(item.index.error)); + recordsToRemove.add(msrList.getRecord(item.index._id)); } } - } + _logger.info("{} records were not written to ES", recordsToRemove.size()); + records.removeAll(recordsToRemove); + } + //add to bloom filter + _addToBloomFilter(records); } catch(IOException e) { _logger.warn("Failed to parse reponse of put metrics. The response was: " + strResponse, e); } } - + @Override public void onFailure(Exception e) { //TODO: Retry with exponential back-off for handling EsRejectedExecutionException/RemoteTransportException/TimeoutException?? _logger.warn("Failed to execute the indexing request.", e); } }; - + _esRestClient.performRequestAsync(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), entity, responseListener); } - + + private void _addToBloomFilter(List records){ + _logger.info("Adding {} records into bloom filter.", records.size()); + for(MetricSchemaRecord record : records) { + String key = constructKey(record.getScope(), record.getMetric(), record.getTagKey(), record.getTagValue(), record.getNamespace()); + BLOOMFILTER.put(key); + } + } + private String _constructTermAggregationQuery(MetricSchemaRecordQuery query, RecordType type) { ObjectMapper mapper = new ObjectMapper(); ObjectNode queryNode = _constructQueryNode(query, mapper); - + long size = query.getLimit() * query.getPage(); SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, "(limit * page) must be greater than 0 and less than Integer.MAX_VALUE"); - + ObjectNode aggsNode = _constructAggsNode(type, Math.max(size, 10000), mapper); - + ObjectNode rootNode = mapper.createObjectNode(); rootNode.put("query", queryNode); rootNode.put("size", 0); rootNode.put("aggs", aggsNode); - + return rootNode.toString(); } - + private String _constructTermQuery(MetricSchemaRecordQuery query, int from, int size) { ObjectMapper mapper = new ObjectMapper(); - + ObjectNode queryNode = _constructQueryNode(query, mapper); - + ObjectNode rootNode = _mapper.createObjectNode(); rootNode.put("query", queryNode); rootNode.put("from", from); rootNode.put("size", size); - + return rootNode.toString(); } - + private ObjectNode _constructSimpleQueryStringNode(List tokens, RecordType... types) { - + if(tokens.isEmpty()) { return null; } - + ObjectMapper mapper = new ObjectMapper(); - + StringBuilder queryString = new StringBuilder(); for(String token : tokens) { queryString.append('+').append(token).append(' '); } queryString.replace(queryString.length() - 1, queryString.length(), "*"); - + ObjectNode node = mapper.createObjectNode(); ArrayNode fieldsNode = mapper.createArrayNode(); for(RecordType type : types) { @@ -701,54 +721,54 @@ private ObjectNode _constructSimpleQueryStringNode(List tokens, RecordTy } node.put("fields", fieldsNode); node.put("query", queryString.toString()); - + ObjectNode simpleQueryStringNode = mapper.createObjectNode(); simpleQueryStringNode.put("simple_query_string", node); - + return simpleQueryStringNode; } - + private String _constructQueryStringQuery(List tokens, int from, int size) { ObjectMapper mapper = new ObjectMapper(); - + ObjectNode simpleQueryStringNode = _constructSimpleQueryStringNode(tokens, RecordType.values()); - + ObjectNode rootNode = mapper.createObjectNode(); rootNode.put("query", simpleQueryStringNode); rootNode.put("from", from); rootNode.put("size", size); - + return rootNode.toString(); } - + private String _constructQueryStringQuery(KeywordQuery kq, Map> tokensMap) { ObjectMapper mapper = new ObjectMapper(); - + ArrayNode filterNodes = mapper.createArrayNode(); for(Map.Entry> entry : tokensMap.entrySet()) { ObjectNode simpleQueryStringNode = _constructSimpleQueryStringNode(entry.getValue(), entry.getKey()); filterNodes.add(simpleQueryStringNode); } - + ObjectNode boolNode = mapper.createObjectNode(); boolNode.put("filter", filterNodes); - + ObjectNode queryNode = mapper.createObjectNode(); queryNode.put("bool", boolNode); - + ObjectNode rootNode = mapper.createObjectNode(); rootNode.put("query", queryNode); rootNode.put("size", 0); - + long size = kq.getLimit() * kq.getPage(); SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE, "(limit * page) must be greater than 0 and less than Integer.MAX_VALUE"); rootNode.put("aggs", _constructAggsNode(kq.getType(), Math.max(size, 10000), mapper)); - + return rootNode.toString(); - + } - + private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapper mapper) { ArrayNode filterNodes = mapper.createArrayNode(); if(SchemaService.containsFilter(query.getMetric())) { @@ -758,7 +778,7 @@ private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapp node.put("regexp", regexpNode); filterNodes.add(node); } - + if(SchemaService.containsFilter(query.getScope())) { ObjectNode node = mapper.createObjectNode(); ObjectNode regexpNode = mapper.createObjectNode(); @@ -766,7 +786,7 @@ private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapp node.put("regexp", regexpNode); filterNodes.add(node); } - + if(SchemaService.containsFilter(query.getTagKey())) { ObjectNode node = mapper.createObjectNode(); ObjectNode regexpNode = mapper.createObjectNode(); @@ -774,7 +794,7 @@ private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapp node.put("regexp", regexpNode); filterNodes.add(node); } - + if(SchemaService.containsFilter(query.getTagValue())) { ObjectNode node = mapper.createObjectNode(); ObjectNode regexpNode = mapper.createObjectNode(); @@ -782,7 +802,7 @@ private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapp node.put("regexp", regexpNode); filterNodes.add(node); } - + if(SchemaService.containsFilter(query.getNamespace())) { ObjectNode node = mapper.createObjectNode(); ObjectNode regexpNode = mapper.createObjectNode(); @@ -790,186 +810,186 @@ private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapp node.put("regexp", regexpNode); filterNodes.add(node); } - + ObjectNode boolNode = mapper.createObjectNode(); boolNode.put("filter", filterNodes); - + ObjectNode queryNode = mapper.createObjectNode(); queryNode.put("bool", boolNode); return queryNode; } - - + + private ObjectNode _constructAggsNode(RecordType type, long limit, ObjectMapper mapper) { - + ObjectNode termsNode = mapper.createObjectNode(); termsNode.put("field", type.getName() + ".raw"); termsNode.put("order", mapper.createObjectNode().put("_term", "asc")); termsNode.put("size", limit); termsNode.put("execution_hint", "map"); - + ObjectNode distinctValuesNode = mapper.createObjectNode(); distinctValuesNode.put("terms", termsNode); - + ObjectNode aggsNode = mapper.createObjectNode(); aggsNode.put("distinct_values", distinctValuesNode); return aggsNode; } - - + + /* Helper method to convert JSON String representation to the corresponding Java entity. */ - private T toEntity(String content, TypeReference type) { - try { - return _mapper.readValue(content, type); - } catch (IOException ex) { - throw new SystemException(ex); - } - } - - + private T toEntity(String content, TypeReference type) { + try { + return _mapper.readValue(content, type); + } catch (IOException ex) { + throw new SystemException(ex); + } + } + + /** Helper to process the response. * Throws a SystemException when the http status code is outsdie of the range 200 - 300. */ - private String extractResponse(Response response) { - requireArgument(response != null, "HttpResponse object cannot be null."); - - int status = response.getStatusLine().getStatusCode(); - String strResponse = extractStringResponse(response); - - if ((status < HttpStatus.SC_OK) || (status >= HttpStatus.SC_MULTIPLE_CHOICES)) { - throw new SystemException("Status code: " + status + " . Error occurred. " + strResponse); - } else { - return strResponse; - } - } - - private String extractStringResponse(Response content) { - requireArgument(content != null, "Response content is null."); - - String result; - HttpEntity entity = null; - - try { - entity = content.getEntity(); - if (entity == null) { - result = ""; - } else { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - entity.writeTo(baos); - result = baos.toString("UTF-8"); - } - return result; - } catch (IOException ex) { - throw new SystemException(ex); - } finally { - if (entity != null) { - try { - EntityUtils.consume(entity); - } catch (IOException ex) { - _logger.warn("Failed to close entity stream.", ex); - } - } - } - } - + private String extractResponse(Response response) { + requireArgument(response != null, "HttpResponse object cannot be null."); + + int status = response.getStatusLine().getStatusCode(); + String strResponse = extractStringResponse(response); + + if ((status < HttpStatus.SC_OK) || (status >= HttpStatus.SC_MULTIPLE_CHOICES)) { + throw new SystemException("Status code: " + status + " . Error occurred. " + strResponse); + } else { + return strResponse; + } + } + + private String extractStringResponse(Response content) { + requireArgument(content != null, "Response content is null."); + + String result; + HttpEntity entity = null; + + try { + entity = content.getEntity(); + if (entity == null) { + result = ""; + } else { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + entity.writeTo(baos); + result = baos.toString("UTF-8"); + } + return result; + } catch (IOException ex) { + throw new SystemException(ex); + } finally { + if (entity != null) { + try { + EntityUtils.consume(entity); + } catch (IOException ex) { + _logger.warn("Failed to close entity stream.", ex); + } + } + } + } + private ObjectMapper _createObjectMapper() { ObjectMapper mapper = new ObjectMapper(); - + mapper.setSerializationInclusion(Include.NON_NULL); SimpleModule module = new SimpleModule(); module.addSerializer(MetricSchemaRecordList.class, new MetricSchemaRecordList.Serializer()); module.addDeserializer(MetricSchemaRecordList.class, new MetricSchemaRecordList.Deserializer()); module.addDeserializer(List.class, new MetricSchemaRecordList.AggDeserializer()); mapper.registerModule(module); - + return mapper; } - - - private ObjectNode _createSettingsNode() { - ObjectMapper mapper = new ObjectMapper(); - - ObjectNode metadataAnalyzer = mapper.createObjectNode(); - metadataAnalyzer.put("tokenizer", "metadata_tokenizer"); - metadataAnalyzer.put("filter", mapper.createArrayNode().add("lowercase")); - - ObjectNode analyzerNode = mapper.createObjectNode(); - analyzerNode.put("metadata_analyzer", metadataAnalyzer); - - ObjectNode tokenizerNode = mapper.createObjectNode(); - tokenizerNode.put("metadata_tokenizer", mapper.createObjectNode().put("type", "pattern").put("pattern", "([^\\p{L}\\d]+)|(?<=[\\p{L}&&[^\\p{Lu}]])(?=\\p{Lu})|(?<=\\p{Lu})(?=\\p{Lu}[\\p{L}&&[^\\p{Lu}]])")); - - ObjectNode analysisNode = mapper.createObjectNode(); - analysisNode.put("analyzer", analyzerNode); - analysisNode.put("tokenizer", tokenizerNode); - - ObjectNode indexNode = mapper.createObjectNode(); - indexNode.put("max_result_window", INDEX_MAX_RESULT_WINDOW); - indexNode.put("number_of_replicas", _replicationFactor); - indexNode.put("number_of_shards", _numShards); - - ObjectNode settingsNode = mapper.createObjectNode(); - settingsNode.put("analysis", analysisNode); - settingsNode.put("index", indexNode); - - return settingsNode; - - } - - - private ObjectNode _createMappingsNode() { - ObjectMapper mapper = new ObjectMapper(); - - ObjectNode propertiesNode = mapper.createObjectNode(); - propertiesNode.put(RecordType.SCOPE.getName(), _createFieldNode(FIELD_TYPE_TEXT)); - propertiesNode.put(RecordType.METRIC.getName(), _createFieldNode(FIELD_TYPE_TEXT)); - propertiesNode.put(RecordType.TAGK.getName(), _createFieldNode(FIELD_TYPE_TEXT)); - propertiesNode.put(RecordType.TAGV.getName(), _createFieldNode(FIELD_TYPE_TEXT)); - propertiesNode.put(RecordType.NAMESPACE.getName(), _createFieldNode(FIELD_TYPE_TEXT)); - - propertiesNode.put("mts", _createFieldNode(FIELD_TYPE_DATE)); - - ObjectNode typeNode = mapper.createObjectNode(); - typeNode.put("properties", propertiesNode); - - ObjectNode mappingsNode = mapper.createObjectNode(); - mappingsNode.put(TYPE_NAME, typeNode); - - return mappingsNode; - } - - - private ObjectNode _createFieldNode(String type) { - ObjectMapper mapper = new ObjectMapper(); - - ObjectNode fieldNode = mapper.createObjectNode(); - fieldNode.put("type", type); - fieldNode.put("analyzer", "metadata_analyzer"); - ObjectNode keywordNode = mapper.createObjectNode(); - keywordNode.put("type", "keyword"); - ObjectNode fieldsNode = mapper.createObjectNode(); - fieldsNode.put("raw", keywordNode); - fieldNode.put("fields", fieldsNode); - return fieldNode; - } - + + + private ObjectNode _createSettingsNode() { + ObjectMapper mapper = new ObjectMapper(); + + ObjectNode metadataAnalyzer = mapper.createObjectNode(); + metadataAnalyzer.put("tokenizer", "metadata_tokenizer"); + metadataAnalyzer.put("filter", mapper.createArrayNode().add("lowercase")); + + ObjectNode analyzerNode = mapper.createObjectNode(); + analyzerNode.put("metadata_analyzer", metadataAnalyzer); + + ObjectNode tokenizerNode = mapper.createObjectNode(); + tokenizerNode.put("metadata_tokenizer", mapper.createObjectNode().put("type", "pattern").put("pattern", "([^\\p{L}\\d]+)|(?<=[\\p{L}&&[^\\p{Lu}]])(?=\\p{Lu})|(?<=\\p{Lu})(?=\\p{Lu}[\\p{L}&&[^\\p{Lu}]])")); + + ObjectNode analysisNode = mapper.createObjectNode(); + analysisNode.put("analyzer", analyzerNode); + analysisNode.put("tokenizer", tokenizerNode); + + ObjectNode indexNode = mapper.createObjectNode(); + indexNode.put("max_result_window", INDEX_MAX_RESULT_WINDOW); + indexNode.put("number_of_replicas", _replicationFactor); + indexNode.put("number_of_shards", _numShards); + + ObjectNode settingsNode = mapper.createObjectNode(); + settingsNode.put("analysis", analysisNode); + settingsNode.put("index", indexNode); + + return settingsNode; + + } + + + private ObjectNode _createMappingsNode() { + ObjectMapper mapper = new ObjectMapper(); + + ObjectNode propertiesNode = mapper.createObjectNode(); + propertiesNode.put(RecordType.SCOPE.getName(), _createFieldNode(FIELD_TYPE_TEXT)); + propertiesNode.put(RecordType.METRIC.getName(), _createFieldNode(FIELD_TYPE_TEXT)); + propertiesNode.put(RecordType.TAGK.getName(), _createFieldNode(FIELD_TYPE_TEXT)); + propertiesNode.put(RecordType.TAGV.getName(), _createFieldNode(FIELD_TYPE_TEXT)); + propertiesNode.put(RecordType.NAMESPACE.getName(), _createFieldNode(FIELD_TYPE_TEXT)); + + propertiesNode.put("mts", _createFieldNode(FIELD_TYPE_DATE)); + + ObjectNode typeNode = mapper.createObjectNode(); + typeNode.put("properties", propertiesNode); + + ObjectNode mappingsNode = mapper.createObjectNode(); + mappingsNode.put(TYPE_NAME, typeNode); + + return mappingsNode; + } + + + private ObjectNode _createFieldNode(String type) { + ObjectMapper mapper = new ObjectMapper(); + + ObjectNode fieldNode = mapper.createObjectNode(); + fieldNode.put("type", type); + fieldNode.put("analyzer", "metadata_analyzer"); + ObjectNode keywordNode = mapper.createObjectNode(); + keywordNode.put("type", "keyword"); + ObjectNode fieldsNode = mapper.createObjectNode(); + fieldsNode.put("raw", keywordNode); + fieldNode.put("fields", fieldsNode); + return fieldNode; + } + private void _createIndexIfNotExists() { try { Response response = _esRestClient.performRequest(HttpMethod.HEAD.getName(), "/" + INDEX_NAME); boolean indexExists = response.getStatusLine().getStatusCode() == HttpStatus.SC_OK ? true : false; - + if(!indexExists) { _logger.info("Index [" + INDEX_NAME + "] does not exist. Will create one."); ObjectMapper mapper = new ObjectMapper(); - + ObjectNode rootNode = mapper.createObjectNode(); rootNode.put("settings", _createSettingsNode()); rootNode.put("mappings", _createMappingsNode()); - + String settingsAndMappingsJson = rootNode.toString(); String requestUrl = new StringBuilder().append("/").append(INDEX_NAME).toString(); - + response = _esRestClient.performRequest(HttpMethod.PUT.getName(), requestUrl, Collections.emptyMap(), new StringEntity(settingsAndMappingsJson)); extractResponse(response); } @@ -977,26 +997,26 @@ private void _createIndexIfNotExists() { _logger.error("Failed to check/create elasticsearch index. ElasticSearchSchemaService may not function.", e); } } - - /** - * Enumeration of supported HTTP methods. - * - * @author Bhinav Sura (bhinav.sura@salesforce.com) - */ - private enum HttpMethod { - - /** POST operation. */ - POST("POST"), - /** PUT operation. */ - PUT("PUT"), - /** HEAD operation. */ - HEAD("HEAD"); - - private String name; - - HttpMethod(String name) { - this.setName(name); - } + + /** + * Enumeration of supported HTTP methods. + * + * @author Bhinav Sura (bhinav.sura@salesforce.com) + */ + private enum HttpMethod { + + /** POST operation. */ + POST("POST"), + /** PUT operation. */ + PUT("PUT"), + /** HEAD operation. */ + HEAD("HEAD"); + + private String name; + + HttpMethod(String name) { + this.setName(name); + } public String getName() { return name; @@ -1005,70 +1025,70 @@ public String getName() { public void setName(String name) { this.name = name; } - } - - + } + + /** - * The set of implementation specific configuration properties. - * - * @author Bhinav Sura (bhinav.sura@salesforce.com) - */ - public enum Property { - - ELASTICSEARCH_ENDPOINT("service.property.schema.elasticsearch.endpoint", "http://localhost:9200,http://localhost:9201"), - /** Connection timeout for ES REST client. */ - ELASTICSEARCH_ENDPOINT_CONNECTION_TIMEOUT("service.property.schema.elasticsearch.endpoint.connection.timeout", "10000"), - /** Socket connection timeout for ES REST client. */ - ELASTICSEARCH_ENDPOINT_SOCKET_TIMEOUT("service.property.schema.elasticsearch.endpoint.socket.timeout", "10000"), - /** Connection count for ES REST client. */ - ELASTICSEARCH_CONNECTION_COUNT("service.property.schema.elasticsearch.connection.count", "10"), - /** Replication factor for metadata_index. */ - ELASTICSEARCH_NUM_REPLICAS("service.property.schema.elasticsearch.num.replicas", "1"), - /** Shard count for metadata_index. */ - ELASTICSEARCH_SHARDS_COUNT("service.property.schema.elasticsearch.shards.count", "10"), - /** The no. of records to batch for bulk indexing requests. - * https://www.elastic.co/guide/en/elasticsearch/guide/current/indexing-performance.html#_using_and_sizing_bulk_requests - */ - ELASTICSEARCH_INDEXING_BATCH_SIZE("service.property.schema.elasticsearch.indexing.batch.size", "10000"), - /** The hashing algorithm to use for generating document id. */ - ELASTICSEARCH_IDGEN_HASH_ALGO("service.property.schema.elasticsearch.idgen.hash.algo", "MD5"); - - private final String _name; - private final String _defaultValue; - - private Property(String name, String defaultValue) { - _name = name; - _defaultValue = defaultValue; - } - - /** - * Returns the property name. - * - * @return The property name. - */ - public String getName() { - return _name; - } - - /** - * Returns the default value for the property. - * - * @return The default value. - */ - public String getDefaultValue() { - return _defaultValue; - } - } - - - static class PutResponse { - private int took; - private boolean errors; - private List items; - - public PutResponse() {} - - public int getTook() { + * The set of implementation specific configuration properties. + * + * @author Bhinav Sura (bhinav.sura@salesforce.com) + */ + public enum Property { + + ELASTICSEARCH_ENDPOINT("service.property.schema.elasticsearch.endpoint", "http://localhost:9200,http://localhost:9201"), + /** Connection timeout for ES REST client. */ + ELASTICSEARCH_ENDPOINT_CONNECTION_TIMEOUT("service.property.schema.elasticsearch.endpoint.connection.timeout", "10000"), + /** Socket connection timeout for ES REST client. */ + ELASTICSEARCH_ENDPOINT_SOCKET_TIMEOUT("service.property.schema.elasticsearch.endpoint.socket.timeout", "10000"), + /** Connection count for ES REST client. */ + ELASTICSEARCH_CONNECTION_COUNT("service.property.schema.elasticsearch.connection.count", "10"), + /** Replication factor for metadata_index. */ + ELASTICSEARCH_NUM_REPLICAS("service.property.schema.elasticsearch.num.replicas", "1"), + /** Shard count for metadata_index. */ + ELASTICSEARCH_SHARDS_COUNT("service.property.schema.elasticsearch.shards.count", "10"), + /** The no. of records to batch for bulk indexing requests. + * https://www.elastic.co/guide/en/elasticsearch/guide/current/indexing-performance.html#_using_and_sizing_bulk_requests + */ + ELASTICSEARCH_INDEXING_BATCH_SIZE("service.property.schema.elasticsearch.indexing.batch.size", "10000"), + /** The hashing algorithm to use for generating document id. */ + ELASTICSEARCH_IDGEN_HASH_ALGO("service.property.schema.elasticsearch.idgen.hash.algo", "MD5"); + + private final String _name; + private final String _defaultValue; + + private Property(String name, String defaultValue) { + _name = name; + _defaultValue = defaultValue; + } + + /** + * Returns the property name. + * + * @return The property name. + */ + public String getName() { + return _name; + } + + /** + * Returns the default value for the property. + * + * @return The default value. + */ + public String getDefaultValue() { + return _defaultValue; + } + } + + + static class PutResponse { + private int took; + private boolean errors; + private List items; + + public PutResponse() {} + + public int getTook() { return took; } @@ -1094,10 +1114,10 @@ public void setItems(List items) { @JsonIgnoreProperties(ignoreUnknown = true) static class Item { - private CreateItem create; - private CreateItem index; - - public Item() {} + private CreateItem create; + private CreateItem index; + + public Item() {} public CreateItem getCreate() { return create; @@ -1106,7 +1126,7 @@ public CreateItem getCreate() { public void setCreate(CreateItem create) { this.create = create; } - + public CreateItem getIndex() { return index; } @@ -1114,46 +1134,46 @@ public CreateItem getIndex() { public void setIndex(CreateItem index) { this.index = index; } - } - + } + @JsonIgnoreProperties(ignoreUnknown = true) - static class CreateItem { - private String _index; - private String _type; - private String _id; - private int status; - private Error error; - - public CreateItem() {} - + static class CreateItem { + private String _index; + private String _type; + private String _id; + private int status; + private Error error; + + public CreateItem() {} + public String get_index() { return _index; } - + public void set_index(String _index) { this._index = _index; } - + public String get_type() { return _type; } - + public void set_type(String _type) { this._type = _type; } - + public String get_id() { return _id; } - + public void set_id(String _id) { this._id = _id; } - + public int getStatus() { return status; } - + public void setStatus(int status) { this.status = status; } @@ -1165,13 +1185,13 @@ public Error getError() { public void setError(Error error) { this.error = error; } - } - + } + @JsonIgnoreProperties(ignoreUnknown = true) static class Error { private String type; private String reason; - + public Error() {} public String getType() { @@ -1190,6 +1210,6 @@ public void setReason(String reason) { this.reason = reason; } } - } + } } From 84b701c70d6f492d1718443e21c99962a2c2fc01 Mon Sep 17 00:00:00 2001 From: Dilip Date: Tue, 5 Jun 2018 11:22:39 -0700 Subject: [PATCH 12/19] Flush bloom filtre at fixed time of day based on configurable property --- .../service/schema/AbstractSchemaService.java | 104 +++++++++++------- .../schema/ElasticSearchSchemaService.java | 12 +- 2 files changed, 72 insertions(+), 44 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index a7a9f3a4f..8bb4ca388 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -3,10 +3,14 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; +import java.util.Calendar; import java.util.List; import java.util.Map.Entry; import java.util.Properties; import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,9 +30,6 @@ public abstract class AbstractSchemaService extends DefaultService implements SchemaService { private static final long POLL_INTERVAL_MS = 10 * 60 * 1000L; - private static final long BLOOM_FILTER_FLUSH_INTERVAL_HOUR = 60 * 60 * 1000L; - private static final int MIN_FLUSH_HOUR_INTERVAL = 6; - private static final int MAX_FLUSH_HOUR_INTERVAL = 15; protected static BloomFilter BLOOMFILTER; private static boolean _writesToBloomFilterEnabled = true; private static Random rand = new Random(); @@ -39,9 +40,10 @@ public abstract class AbstractSchemaService extends DefaultService implements Sc private final Logger _logger = LoggerFactory.getLogger(getClass()); private final Thread _bloomFilterMonitorThread; private final boolean _cacheEnabled; - protected final boolean _syncPut; - + private int bloomFilterFlushHourToStartAt; + private ScheduledExecutorService scheduledExecutorService; + protected AbstractSchemaService(SystemConfiguration config) { super(config); @@ -60,6 +62,10 @@ protected AbstractSchemaService(SystemConfiguration config) { if(_cacheEnabled) { _bloomFilterMonitorThread.start(); } + + bloomFilterFlushHourToStartAt = Integer.parseInt(config.getValue(Property.BLOOM_FILTER_FLUSH_HOUR_TO_START_AT.getName(), + Property.BLOOM_FILTER_FLUSH_HOUR_TO_START_AT.getDefaultValue())); + createScheduledExecutorService(bloomFilterFlushHourToStartAt); } @Override @@ -130,6 +136,7 @@ public void dispose() { } else { _logger.info("Requested shutdown of bloom filter monitor thread aborted, as it is not yet running."); } + shutdownScheduledExecutorService(); } @Override @@ -155,36 +162,62 @@ protected String constructKey(Metric metric, Entry tagEntry) { if(tagEntry != null) { sb.append('\0').append(tagEntry.getKey()).append('\0').append(tagEntry.getValue()); } - + // Add randomness for each instance of bloom filter running on different // schema clients to reduce probability of false positives that metric schemas are not written to ES sb.append('\0').append(randomNumber); return sb.toString(); } - + protected String constructKey(String scope, String metric, String tagk, String tagv, String namespace) { StringBuilder sb = new StringBuilder(scope); sb.append('\0').append(metric); - + if(namespace != null) { sb.append('\0').append(namespace); } - + if(tagk != null) { sb.append('\0').append(tagk); } - + if(tagv != null) { sb.append('\0').append(tagv); } - + // Add randomness for each instance of bloom filter running on different // schema clients to reduce probability of false positives that metric schemas are not written to ES sb.append('\0').append(randomNumber); - + return sb.toString(); } + + private void createScheduledExecutorService(int targetHourToStartAt){ + scheduledExecutorService = Executors.newScheduledThreadPool(1); + int initialDelayInSeconds = getNumHoursUntilTargetHour(targetHourToStartAt) * 60 * 60; + BloomFilterFlushThread bloomFilterFlushThread = new BloomFilterFlushThread(); + scheduledExecutorService.scheduleAtFixedRate(bloomFilterFlushThread, initialDelayInSeconds, 24 * 60 *60, TimeUnit.SECONDS); + } + + private void shutdownScheduledExecutorService(){ + _logger.info("Shutting down scheduled bloom filter flush executor service"); + scheduledExecutorService.shutdown(); + try { + scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException ex) { + _logger.warn("Shutdown of executor service was interrupted."); + Thread.currentThread().interrupt(); + } + } + + private int getNumHoursUntilTargetHour(int targetHour){ + _logger.info("Initialized bloom filter flushing out, at {} hour of day", targetHour); + Calendar calendar = Calendar.getInstance(); + int hour = calendar.get(Calendar.HOUR_OF_DAY); + return hour < targetHour ? (targetHour - hour) : (targetHour + 24 - hour); + } + /** * The set of implementation specific configuration properties. * @@ -198,7 +231,8 @@ public enum Property { SYNC_PUT("service.property.schema.sync.put", "false"), BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS("service.property.schema.bloomfilter.expected.number.insertions", "400000000"), - BLOOMFILTER_ERROR_RATE("service.property.schema.bloomfilter.error.rate", "0.00001"); + BLOOMFILTER_ERROR_RATE("service.property.schema.bloomfilter.error.rate", "0.00001"), + BLOOM_FILTER_FLUSH_HOUR_TO_START_AT("service.property.schema.bloomfilter.flush.hour.to.start.at","2"); private final String _name; private final String _defaultValue; @@ -236,24 +270,14 @@ public String getDefaultValue() { * @author Dilip Devaraj (ddevaraj@salesforce.com) */ private class BloomFilterMonitorThread implements Runnable { - - long startTimeBeforeFlushInMillis; - int flushAtHour; - @Override public void run() { - startTimeBeforeFlushInMillis = System.currentTimeMillis(); - flushAtHour = getRandomHourBetweenRange(MIN_FLUSH_HOUR_INTERVAL, MAX_FLUSH_HOUR_INTERVAL); - _logger.info("Initialized bloom filter flushing out, after {} hours", flushAtHour); _logger.info("Initialized random number for bloom filter key = {}", randomNumber); while (!Thread.currentThread().isInterrupted()) { _sleepForPollPeriod(); if (!Thread.currentThread().isInterrupted()) { try { _checkBloomFilterUsage(); - - // flush out bloom filter every K hours - _flushBloomFilter(); } catch (Exception ex) { _logger.warn("Exception occurred while checking bloom filter usage.", ex); } @@ -266,23 +290,6 @@ private void _checkBloomFilterUsage() { _logger.info("Bloom expected error rate = {}", BLOOMFILTER.expectedFpp()); } - private void _flushBloomFilter() { - long currentTimeInMillis = System.currentTimeMillis(); - if((currentTimeInMillis - startTimeBeforeFlushInMillis) > BLOOM_FILTER_FLUSH_INTERVAL_HOUR * flushAtHour){ - _logger.info("Flushing out bloom filter entries, after {} hours", flushAtHour); - BLOOMFILTER = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), bloomFilterExpectedNumberInsertions , bloomFilterErrorRate); - randomNumber = rand.nextInt(); - flushAtHour = getRandomHourBetweenRange(MIN_FLUSH_HOUR_INTERVAL, MAX_FLUSH_HOUR_INTERVAL); - _logger.info("New random number for bloom filter key = {}", randomNumber); - _logger.info("New flush interva = {} hours", flushAtHour); - startTimeBeforeFlushInMillis = currentTimeInMillis; - } - } - - private int getRandomHourBetweenRange(int minHour, int maxHour){ - return (int) (Math.random() * (maxHour - minHour +1) + minHour); - } - private void _sleepForPollPeriod() { try { _logger.info("Sleeping for {}s before checking bloom filter statistics.", POLL_INTERVAL_MS / 1000); @@ -293,4 +300,21 @@ private void _sleepForPollPeriod() { } } } + + private class BloomFilterFlushThread implements Runnable { + @Override + public void run() { + try{ + _flushBloomFilter(); + } catch (Exception ex) { + _logger.warn("Exception occurred while flushing bloom filter.", ex); + } + } + + private void _flushBloomFilter() { + _logger.info("Flushing out bloom filter entries"); + BLOOMFILTER = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), bloomFilterExpectedNumberInsertions , bloomFilterErrorRate); + randomNumber = rand.nextInt(); + } + } } diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java index df73d3833..b00ec935e 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java @@ -591,8 +591,10 @@ private void _upsert(List records) { recordsToRemove.add(msrList.getRecord(item.index._id)); } } - _logger.info("{} records were not written to ES", recordsToRemove.size()); - records.removeAll(recordsToRemove); + if(recordsToRemove.size() != 0) { + _logger.info("{} records were not written to ES", recordsToRemove.size()); + records.removeAll(recordsToRemove); + } } //add to bloom filter _addToBloomFilter(records); @@ -641,8 +643,10 @@ public void onSuccess(Response response) { recordsToRemove.add(msrList.getRecord(item.index._id)); } } - _logger.info("{} records were not written to ES", recordsToRemove.size()); - records.removeAll(recordsToRemove); + if(recordsToRemove.size() != 0) { + _logger.info("{} records were not written to ES", recordsToRemove.size()); + records.removeAll(recordsToRemove); + } } //add to bloom filter _addToBloomFilter(records); From df1c4c4e7719652a48cf229613ceba38f2ec6401 Mon Sep 17 00:00:00 2001 From: Dilip Date: Wed, 6 Jun 2018 12:09:51 -0700 Subject: [PATCH 13/19] Code cleanup --- .../service/schema/AbstractSchemaService.java | 47 +++++++------------ .../schema/ElasticSearchSchemaService.java | 2 +- 2 files changed, 18 insertions(+), 31 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index 8bb4ca388..d00f55329 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -28,22 +28,20 @@ import com.salesforce.dva.argus.system.SystemConfiguration; public abstract class AbstractSchemaService extends DefaultService implements SchemaService { - private static final long POLL_INTERVAL_MS = 10 * 60 * 1000L; - protected static BloomFilter BLOOMFILTER; - private static boolean _writesToBloomFilterEnabled = true; - private static Random rand = new Random(); - private static int randomNumber = rand.nextInt(); + + protected BloomFilter bloomFilter; + private Random rand = new Random(); + private int randomNumber = rand.nextInt(); private int bloomFilterExpectedNumberInsertions; private double bloomFilterErrorRate; private final Logger _logger = LoggerFactory.getLogger(getClass()); private final Thread _bloomFilterMonitorThread; - private final boolean _cacheEnabled; protected final boolean _syncPut; private int bloomFilterFlushHourToStartAt; private ScheduledExecutorService scheduledExecutorService; - + protected AbstractSchemaService(SystemConfiguration config) { super(config); @@ -51,17 +49,13 @@ protected AbstractSchemaService(SystemConfiguration config) { Property.BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS.getDefaultValue())); bloomFilterErrorRate = Double.parseDouble(config.getValue(Property.BLOOMFILTER_ERROR_RATE.getName(), Property.BLOOMFILTER_ERROR_RATE.getDefaultValue())); - BLOOMFILTER = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), bloomFilterExpectedNumberInsertions , bloomFilterErrorRate); + bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), bloomFilterExpectedNumberInsertions , bloomFilterErrorRate); - _cacheEnabled = Boolean.parseBoolean( - config.getValue(Property.CACHE_SCHEMARECORDS.getName(), Property.CACHE_SCHEMARECORDS.getDefaultValue())); _syncPut = Boolean.parseBoolean( config.getValue(Property.SYNC_PUT.getName(), Property.SYNC_PUT.getDefaultValue())); _bloomFilterMonitorThread = new Thread(new BloomFilterMonitorThread(), "bloom-filter-monitor"); - if(_cacheEnabled) { - _bloomFilterMonitorThread.start(); - } + _bloomFilterMonitorThread.start(); bloomFilterFlushHourToStartAt = Integer.parseInt(config.getValue(Property.BLOOM_FILTER_FLUSH_HOUR_TO_START_AT.getName(), Property.BLOOM_FILTER_FLUSH_HOUR_TO_START_AT.getDefaultValue())); @@ -81,20 +75,14 @@ public void put(List metrics) { requireNotDisposed(); SystemAssert.requireArgument(metrics != null, "Metric list cannot be null."); - //If cache is not enabled, call implementation specific put with the list of metrics. - if(!_cacheEnabled) { - implementationSpecificPut(metrics); - return; - } - - //If cache is enabled, create a list of metricsToPut that do not exist on the BLOOMFILTER and then call implementation + // Create a list of metricsToPut that do not exist on the BLOOMFILTER and then call implementation // specific put with only those subset of metricsToPut. List metricsToPut = new ArrayList<>(metrics.size()); for(Metric metric : metrics) { if(metric.getTags().isEmpty()) { String key = constructKey(metric, null); - boolean found = BLOOMFILTER.mightContain(key); + boolean found = bloomFilter.mightContain(key); if(!found) { metricsToPut.add(metric); } @@ -102,7 +90,7 @@ public void put(List metrics) { boolean newTags = false; for(Entry tagEntry : metric.getTags().entrySet()) { String key = constructKey(metric, tagEntry); - boolean found = BLOOMFILTER.mightContain(key); + boolean found = bloomFilter.mightContain(key); if(!found) { newTags = true; } @@ -224,14 +212,12 @@ private int getNumHoursUntilTargetHour(int targetHour){ * @author Bhinav Sura (bhinav.sura@salesforce.com) */ public enum Property { - - /* If set to true, schema records will be cached on writes. This helps to check if a schema records already exists, - * and if it does then do not rewrite. Provide more heap space when using this option. */ - CACHE_SCHEMARECORDS("service.property.schema.cache.schemarecords", "false"), SYNC_PUT("service.property.schema.sync.put", "false"), - BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS("service.property.schema.bloomfilter.expected.number.insertions", "400000000"), BLOOMFILTER_ERROR_RATE("service.property.schema.bloomfilter.error.rate", "0.00001"), + /* + * Have a different configured flush start hour for different machines to prevent thundering herd problem. + */ BLOOM_FILTER_FLUSH_HOUR_TO_START_AT("service.property.schema.bloomfilter.flush.hour.to.start.at","2"); private final String _name; @@ -286,8 +272,8 @@ public void run() { } private void _checkBloomFilterUsage() { - _logger.info("Bloom approx no. elements = {}", BLOOMFILTER.approximateElementCount()); - _logger.info("Bloom expected error rate = {}", BLOOMFILTER.expectedFpp()); + _logger.info("Bloom approx no. elements = {}", bloomFilter.approximateElementCount()); + _logger.info("Bloom expected error rate = {}", bloomFilter.expectedFpp()); } private void _sleepForPollPeriod() { @@ -313,7 +299,8 @@ public void run() { private void _flushBloomFilter() { _logger.info("Flushing out bloom filter entries"); - BLOOMFILTER = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), bloomFilterExpectedNumberInsertions , bloomFilterErrorRate); + bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), bloomFilterExpectedNumberInsertions , bloomFilterErrorRate); + /* Don't need explicit synchronization to prevent slowness majority of the time*/ randomNumber = rand.nextInt(); } } diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java index b00ec935e..75bc7d545 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java @@ -669,7 +669,7 @@ private void _addToBloomFilter(List records){ _logger.info("Adding {} records into bloom filter.", records.size()); for(MetricSchemaRecord record : records) { String key = constructKey(record.getScope(), record.getMetric(), record.getTagKey(), record.getTagValue(), record.getNamespace()); - BLOOMFILTER.put(key); + bloomFilter.put(key); } } From bd08b259aec587160f2df4a4870d80a5e5d5e49b Mon Sep 17 00:00:00 2001 From: Dilip Date: Wed, 6 Jun 2018 12:14:19 -0700 Subject: [PATCH 14/19] Code cleanup --- .../dva/argus/service/schema/AbstractSchemaService.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index d00f55329..187bbd1c6 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -29,8 +29,8 @@ public abstract class AbstractSchemaService extends DefaultService implements SchemaService { private static final long POLL_INTERVAL_MS = 10 * 60 * 1000L; - - + private static int DAY_IN_SECONDS = 24 * 60 * 60; + private static int HOUR_IN_SECONDS = 60 * 60; protected BloomFilter bloomFilter; private Random rand = new Random(); private int randomNumber = rand.nextInt(); @@ -183,9 +183,9 @@ protected String constructKey(String scope, String metric, String tagk, String t private void createScheduledExecutorService(int targetHourToStartAt){ scheduledExecutorService = Executors.newScheduledThreadPool(1); - int initialDelayInSeconds = getNumHoursUntilTargetHour(targetHourToStartAt) * 60 * 60; + int initialDelayInSeconds = getNumHoursUntilTargetHour(targetHourToStartAt) * HOUR_IN_SECONDS; BloomFilterFlushThread bloomFilterFlushThread = new BloomFilterFlushThread(); - scheduledExecutorService.scheduleAtFixedRate(bloomFilterFlushThread, initialDelayInSeconds, 24 * 60 *60, TimeUnit.SECONDS); + scheduledExecutorService.scheduleAtFixedRate(bloomFilterFlushThread, initialDelayInSeconds, DAY_IN_SECONDS, TimeUnit.SECONDS); } private void shutdownScheduledExecutorService(){ From 2a41332ea6c2626fe0d27471453262fda4abd6e5 Mon Sep 17 00:00:00 2001 From: Dilip Date: Wed, 6 Jun 2018 14:20:31 -0700 Subject: [PATCH 15/19] Fix some existing test failures --- .../service/schema/AbstractSchemaService.java | 6 ++-- .../schema/ElasticSearchSchemaService.java | 4 +-- .../schema/AbstractSchemaServiceTest.java | 33 ++++++++++++------- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index 187bbd1c6..fa7bb11ac 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -29,9 +29,9 @@ public abstract class AbstractSchemaService extends DefaultService implements SchemaService { private static final long POLL_INTERVAL_MS = 10 * 60 * 1000L; - private static int DAY_IN_SECONDS = 24 * 60 * 60; - private static int HOUR_IN_SECONDS = 60 * 60; - protected BloomFilter bloomFilter; + private static final int DAY_IN_SECONDS = 24 * 60 * 60; + private static final int HOUR_IN_SECONDS = 60 * 60; + protected static BloomFilter bloomFilter; private Random rand = new Random(); private int randomNumber = rand.nextInt(); private int bloomFilterExpectedNumberInsertions; diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java index 75bc7d545..30f605333 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java @@ -218,7 +218,7 @@ protected void implementationSpecificPut(List metrics) { /* Convert the given list of metrics to a list of metric schema records. At the same time, fracture the records list * if its size is greater than INDEXING_BATCH_SIZE. */ - private List> _fracture(List metrics) { + public List> _fracture(List metrics) { List> fracturedList = new ArrayList<>(); List records = new ArrayList<>(_bulkIndexingSize); @@ -665,7 +665,7 @@ public void onFailure(Exception e) { _esRestClient.performRequestAsync(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), entity, responseListener); } - private void _addToBloomFilter(List records){ + public void _addToBloomFilter(List records){ _logger.info("Adding {} records into bloom filter.", records.size()); for(MetricSchemaRecord record : records) { String key = constructKey(record.getScope(), record.getMetric(), record.getTagKey(), record.getTagValue(), record.getNamespace()); diff --git a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java index 55293b376..2ea27577d 100644 --- a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java +++ b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java @@ -3,6 +3,8 @@ import static org.junit.Assert.assertTrue; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -16,6 +18,7 @@ import com.salesforce.dva.argus.AbstractTest; import com.salesforce.dva.argus.entity.Metric; +import com.salesforce.dva.argus.entity.MetricSchemaRecord; import com.salesforce.dva.argus.service.schema.ElasticSearchSchemaService; @@ -34,11 +37,11 @@ public void testPutEverythingCached() { List metrics = createRandomMetrics("test-scope", "test-metric", 10); ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService()); - _enableCaching(service); final AtomicInteger count = new AtomicInteger(); ElasticSearchSchemaService spyService = _initializeSpyService(service, count); spyService.put(metrics); + spyService._addToBloomFilter(spyService._fracture(metrics).get(0)); assertTrue(count.get() == metrics.size()); spyService.put(metrics); assertTrue(count.get() == metrics.size()); @@ -53,11 +56,11 @@ public void testPutPartialCached() { total.addAll(newMetrics); ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService()); - _enableCaching(service); final AtomicInteger count = new AtomicInteger(); ElasticSearchSchemaService spyService = _initializeSpyService(service, count); spyService.put(metrics); + spyService._addToBloomFilter(spyService._fracture(metrics).get(0)); assertTrue(count.get() == metrics.size()); spyService.put(new ArrayList<>(total)); assertTrue(count.get() == total.size()); @@ -71,7 +74,6 @@ public void testPutNothingCached() { List newMetrics = createRandomMetrics("test-scope", "test-metric1", 5); ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService()); - _enableCaching(service); final AtomicInteger count = new AtomicInteger(); ElasticSearchSchemaService spyService = _initializeSpyService(service, count); @@ -111,17 +113,26 @@ public Void answer(InvocationOnMock invocation) throws Throwable { return spyService; } - +/* private void _addToBloomFilter(ElasticSearchSchemaService service,List schemaRecords) { + try { + Method method= service.getClass().getDeclaredMethod("_addToBloomFilter",null); + method.setAccessible(true); + method.invoke(service, null); + } catch (NoSuchMethodException | InvocationTargetException | SecurityException | IllegalArgumentException | IllegalAccessException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }*/ - private void _enableCaching(ElasticSearchSchemaService service) { + +/* private void _fracture(ElasticSearchSchemaService service){ try { - Field field = service.getClass().getSuperclass().getDeclaredField("_cacheEnabled"); - field.setAccessible(true); - field.set(service, true); - } catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) { + Method method= service.getClass().getDeclaredMethod("_fracture",List); + method.setAccessible(true); + method.invoke(service, null); + } catch (NoSuchMethodException | InvocationTargetException | SecurityException | IllegalArgumentException | IllegalAccessException e) { // TODO Auto-generated catch block e.printStackTrace(); } - } - + }*/ } From 97abd823d8ec34c50db2166fde43905797fc05f6 Mon Sep 17 00:00:00 2001 From: Dilip Date: Wed, 6 Jun 2018 16:20:54 -0700 Subject: [PATCH 16/19] Modify and add new tests --- .../service/schema/AbstractSchemaService.java | 2 +- .../schema/ElasticSearchSchemaService.java | 4 +- .../schema/AbstractSchemaServiceTest.java | 62 ++++++------------- 3 files changed, 22 insertions(+), 46 deletions(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index fa7bb11ac..2eecfc7a0 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -199,7 +199,7 @@ private void shutdownScheduledExecutorService(){ } } - private int getNumHoursUntilTargetHour(int targetHour){ + protected int getNumHoursUntilTargetHour(int targetHour){ _logger.info("Initialized bloom filter flushing out, at {} hour of day", targetHour); Calendar calendar = Calendar.getInstance(); int hour = calendar.get(Calendar.HOUR_OF_DAY); diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java index 30f605333..f104dd37c 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java @@ -218,7 +218,7 @@ protected void implementationSpecificPut(List metrics) { /* Convert the given list of metrics to a list of metric schema records. At the same time, fracture the records list * if its size is greater than INDEXING_BATCH_SIZE. */ - public List> _fracture(List metrics) { + protected List> _fracture(List metrics) { List> fracturedList = new ArrayList<>(); List records = new ArrayList<>(_bulkIndexingSize); @@ -665,7 +665,7 @@ public void onFailure(Exception e) { _esRestClient.performRequestAsync(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), entity, responseListener); } - public void _addToBloomFilter(List records){ + protected void _addToBloomFilter(List records){ _logger.info("Adding {} records into bloom filter.", records.size()); for(MetricSchemaRecord record : records) { String key = constructKey(record.getScope(), record.getMetric(), record.getTagKey(), record.getTagValue(), record.getNamespace()); diff --git a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java index 2ea27577d..dc6d52bf1 100644 --- a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java +++ b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java @@ -6,6 +6,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Calendar; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -23,7 +24,7 @@ /** - * This test suite tests the trie-based caching in the AbstractSchemaService class. Although we are instantiating + * This test suite tests the bloom filter caching in the AbstractSchemaService class. Although we are instantiating * ElasticSearchSchemaService object, the implemtationSpecificPut (which is part of ES Schema Service) has been * mocked out. In essence, these tests only test the caching functionality. * @@ -34,22 +35,22 @@ public class AbstractSchemaServiceTest extends AbstractTest { @Test public void testPutEverythingCached() { - List metrics = createRandomMetrics("test-scope", "test-metric", 10); ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService()); final AtomicInteger count = new AtomicInteger(); ElasticSearchSchemaService spyService = _initializeSpyService(service, count); spyService.put(metrics); + // add to bloom filter cache spyService._addToBloomFilter(spyService._fracture(metrics).get(0)); assertTrue(count.get() == metrics.size()); spyService.put(metrics); + // count should be same since we are re-reading cached value assertTrue(count.get() == metrics.size()); } @Test public void testPutPartialCached() { - List metrics = createRandomMetrics("test-scope", "test-metric", 10); List newMetrics = createRandomMetrics("test-scope", "test-metric1", 5); Set total = new HashSet<>(metrics); @@ -60,16 +61,18 @@ public void testPutPartialCached() { ElasticSearchSchemaService spyService = _initializeSpyService(service, count); spyService.put(metrics); + // 1st metric cached spyService._addToBloomFilter(spyService._fracture(metrics).get(0)); assertTrue(count.get() == metrics.size()); + // 1st metric already in cache (partial case scenario), and now 2nd metric will also be added to cache. + // Total number of metrics in cache = metric1.size() and metric2.size() spyService.put(new ArrayList<>(total)); + spyService._addToBloomFilter(spyService._fracture(new ArrayList<>(total)).get(0)); assertTrue(count.get() == total.size()); - } @Test public void testPutNothingCached() { - List metrics = createRandomMetrics("test-scope", "test-metric", 10); List newMetrics = createRandomMetrics("test-scope", "test-metric1", 5); @@ -82,22 +85,7 @@ public void testPutNothingCached() { spyService.put(newMetrics); assertTrue(count.get() == metrics.size() + newMetrics.size()); } - - @Test - public void testPutCachingDisabled() { - - List metrics = createRandomMetrics("test-scope", "test-metric", 10); - - ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService()); - final AtomicInteger count = new AtomicInteger(); - ElasticSearchSchemaService spyService = _initializeSpyService(service, count); - - spyService.put(metrics); - assertTrue(count.get() == metrics.size()); - spyService.put(metrics); - assertTrue(count.get() == metrics.size() * 2); - } - + private ElasticSearchSchemaService _initializeSpyService(ElasticSearchSchemaService service, final AtomicInteger count) { ElasticSearchSchemaService spyService = Mockito.spy(service); @@ -113,26 +101,14 @@ public Void answer(InvocationOnMock invocation) throws Throwable { return spyService; } -/* private void _addToBloomFilter(ElasticSearchSchemaService service,List schemaRecords) { - try { - Method method= service.getClass().getDeclaredMethod("_addToBloomFilter",null); - method.setAccessible(true); - method.invoke(service, null); - } catch (NoSuchMethodException | InvocationTargetException | SecurityException | IllegalArgumentException | IllegalAccessException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - }*/ - - -/* private void _fracture(ElasticSearchSchemaService service){ - try { - Method method= service.getClass().getDeclaredMethod("_fracture",List); - method.setAccessible(true); - method.invoke(service, null); - } catch (NoSuchMethodException | InvocationTargetException | SecurityException | IllegalArgumentException | IllegalAccessException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - }*/ + @Test + public void getNumHoursUntilNextFlushBloomFilter() { + ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService()); + + Calendar calendar = Calendar.getInstance(); + + // Will wait 24 hours before next flush if at same hour boundary + int hour = calendar.get(Calendar.HOUR_OF_DAY); + assertTrue(service.getNumHoursUntilTargetHour(hour) == 24); + } } From d5c1c8091a8197469fa3d3c62c3cd4afc82dda86 Mon Sep 17 00:00:00 2001 From: Dilip Date: Wed, 6 Jun 2018 17:58:33 -0700 Subject: [PATCH 17/19] Chabge to ElasticSearchSchemaService as default schema service --- .../com/salesforce/dva/argus/system/SystemConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemConfiguration.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemConfiguration.java index 35938fe99..853155484 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemConfiguration.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemConfiguration.java @@ -252,7 +252,7 @@ public enum Property { AUTH_SERVICE_IMPL_CLASS("service.binding.auth", "com.salesforce.dva.argus.service.auth.LDAPAuthService"), AUTH_SERVICE_PROPERTY_FILE("service.config.auth","argus.properties"), - SCHEMA_SERVICE_IMPL_CLASS("service.binding.schema", "com.salesforce.dva.argus.service.schema.AsyncHbaseSchemaService"), + SCHEMA_SERVICE_IMPL_CLASS("service.binding.schema", "com.salesforce.dva.argus.service.schema.ElasticSearchSchemaService"), SCHEMA_SERVICE_PROPERTY_FILE("service.config.schema","argus.properties"), HISTORY_SERVICE_IMPL_CLASS("service.binding.history", "com.salesforce.dva.argus.service.history.HBaseHistoryService"), From 1ba2e308dfd0d6184485ebc0601c6de4c6196165 Mon Sep 17 00:00:00 2001 From: Dilip Date: Wed, 6 Jun 2018 18:12:01 -0700 Subject: [PATCH 18/19] Reduce default bloom filter heap size for tests to run on Travis --- .../dva/argus/service/schema/AbstractSchemaService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java index 2eecfc7a0..ffb87f857 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java @@ -213,7 +213,7 @@ protected int getNumHoursUntilTargetHour(int targetHour){ */ public enum Property { SYNC_PUT("service.property.schema.sync.put", "false"), - BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS("service.property.schema.bloomfilter.expected.number.insertions", "400000000"), + BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS("service.property.schema.bloomfilter.expected.number.insertions", "40"), BLOOMFILTER_ERROR_RATE("service.property.schema.bloomfilter.error.rate", "0.00001"), /* * Have a different configured flush start hour for different machines to prevent thundering herd problem. From 5d5393f018f9f159c1f40d2d84467645aaac8ede Mon Sep 17 00:00:00 2001 From: Dilip Date: Wed, 6 Jun 2018 18:39:12 -0700 Subject: [PATCH 19/19] Revert schema default config change --- .../com/salesforce/dva/argus/system/SystemConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemConfiguration.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemConfiguration.java index 853155484..35938fe99 100644 --- a/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemConfiguration.java +++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/system/SystemConfiguration.java @@ -252,7 +252,7 @@ public enum Property { AUTH_SERVICE_IMPL_CLASS("service.binding.auth", "com.salesforce.dva.argus.service.auth.LDAPAuthService"), AUTH_SERVICE_PROPERTY_FILE("service.config.auth","argus.properties"), - SCHEMA_SERVICE_IMPL_CLASS("service.binding.schema", "com.salesforce.dva.argus.service.schema.ElasticSearchSchemaService"), + SCHEMA_SERVICE_IMPL_CLASS("service.binding.schema", "com.salesforce.dva.argus.service.schema.AsyncHbaseSchemaService"), SCHEMA_SERVICE_PROPERTY_FILE("service.config.schema","argus.properties"), HISTORY_SERVICE_IMPL_CLASS("service.binding.history", "com.salesforce.dva.argus.service.history.HBaseHistoryService"),