Skip to content

Commit

Permalink
Avoid calling RegularStatement.toString() to de-duplicate the insert …
Browse files Browse the repository at this point in the history
…statements and introduce additional options to control what gets indexed.
  • Loading branch information
Jesse White committed Jun 2, 2016
1 parent 787bd4e commit b6eebfd
Show file tree
Hide file tree
Showing 8 changed files with 529 additions and 100 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2016, The OpenNMS Group
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opennms.newts.cassandra.search;

import javax.inject.Inject;
import javax.inject.Named;

public class CassandraIndexingOptions {

private final boolean m_enableHierarchicalIndexing;
private final boolean m_indexUsingDefaultTerm;
private final boolean m_indexResourceTerms;

private final int m_maxBatchSize;

public static class Builder {
private int maxBatchSize = 16;
private boolean enableHierarchicalIndexing = true;
private boolean indexUsingDefaultTerm = true;
private boolean indexResourceTerms = true;

public Builder withHierarchicalIndexing(boolean enableHierarchicalIndexing) {
this.enableHierarchicalIndexing = enableHierarchicalIndexing;
return this;
}

public Builder withIndexUsingDefaultTerm(boolean indexUsingDefaultTerm) {
this.indexUsingDefaultTerm = indexUsingDefaultTerm;
return this;
}

public Builder withIndexResourceTerms(boolean indexResourceTerms) {
this.indexResourceTerms = indexResourceTerms;
return this;
}

public Builder withMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
return this;
}

public CassandraIndexingOptions build() {
return new CassandraIndexingOptions(this);
}
}

public CassandraIndexingOptions(CassandraIndexingOptions.Builder builder) {
m_maxBatchSize = builder.maxBatchSize;
m_enableHierarchicalIndexing = builder.enableHierarchicalIndexing;
m_indexUsingDefaultTerm = builder.indexUsingDefaultTerm;
m_indexResourceTerms = builder.indexResourceTerms;
}

@Inject
public CassandraIndexingOptions(@Named("search.hierarical-indexing") boolean enableHierarchicalIndexing) {
this(new Builder().withHierarchicalIndexing(enableHierarchicalIndexing));
}

public boolean isHierarchicalIndexingEnabled() {
return m_enableHierarchicalIndexing;
}

public boolean shouldIndexUsingDefaultTerm() {
return m_indexUsingDefaultTerm;
}

public boolean shouldIndexResourceTerms() {
return m_indexResourceTerms;
}

public int getMaxBatchSize() {
return m_maxBatchSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2016, The OpenNMS Group
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opennms.newts.cassandra.search.support;

import org.opennms.newts.api.Context;

import com.datastax.driver.core.Statement;

/**
* Used to group and de-duplicate statements before they are generated and executed.
*
* @author jwhite
*/
public interface StatementGenerator {

/**
* Returns a key which can be used to group statements into batches, or null
* if the statement should never be be batched.
*
* @return key or null
*/
String getKey();

/**
* Returns the context with this this statement is associated.
*
* @return context
*/
Context getContext();

/**
* Generates the statement.
*
* @return statement
*/
Statement toStatement();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.opennms.newts.cassandra.search.support;

import static com.datastax.driver.core.querybuilder.QueryBuilder.unloggedBatch;

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.opennms.newts.cassandra.ContextConfigurations;

import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.Statement;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

public class StatementUtils {

public static List<Statement> getStatements(ContextConfigurations contextConfigurations, int maxBatchSize,
Set<StatementGenerator> generators) {
List<Statement> statementsToExecute = Lists.newArrayList();

Map<String, List<Statement>> statementsByKey = Maps.newHashMap();
for (StatementGenerator generator : generators) {
Statement statement = generator.toStatement()
.setConsistencyLevel(contextConfigurations.getWriteConsistency(generator.getContext()));
String key = generator.getKey();
if (key == null) {
// Don't try batching these
statementsToExecute.add(statement);
continue;
}

// Group these by key
List<Statement> statementsForKey = statementsByKey.get(key);
if (statementsForKey == null) {
statementsForKey = Lists.newArrayList();
statementsByKey.put(key, statementsForKey);
}
statementsForKey.add(statement);
}

// Consolidate the grouped statements into batches
for (List<Statement> statementsForKey: statementsByKey.values()) {
for (List<Statement> partition : Lists.partition(statementsForKey, maxBatchSize)) {
statementsToExecute.add(unloggedBatch(partition.toArray(new RegularStatement[partition.size()])));
}
}

return statementsToExecute;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ public void test() {
MetricRegistry registry = new MetricRegistry();
ContextConfigurations contextConfigurations = new ContextConfigurations();

Indexer indexer = new CassandraIndexer(session, 86400, mockCache, registry, false, new SimpleResourceIdSplitter(), contextConfigurations);
CassandraIndexingOptions options = new CassandraIndexingOptions.Builder()
.withHierarchicalIndexing(false).build();

Indexer indexer = new CassandraIndexer(session, 86400, mockCache, registry, options, new SimpleResourceIdSplitter(), contextConfigurations);

indexer.update(samples);

Expand Down Expand Up @@ -143,7 +146,10 @@ public void testDelete() {

CassandraSession session = newtsInstance.getCassandraSession();

Indexer indexer = new CassandraIndexer(session, 86400, cache, registry, false, new SimpleResourceIdSplitter(), contextConfigurations);
CassandraIndexingOptions options = new CassandraIndexingOptions.Builder()
.withHierarchicalIndexing(false).build();

Indexer indexer = new CassandraIndexer(session, 86400, cache, registry, options, new SimpleResourceIdSplitter(), contextConfigurations);
CassandraSearcher searcher = new CassandraSearcher(session, registry, contextConfigurations);

Map<String, String> base = map("meat", "people", "bread", "beer");
Expand Down Expand Up @@ -181,7 +187,10 @@ public void canWalkTheResourceTree() {
MetricRegistry registry = new MetricRegistry();
ContextConfigurations contextConfigurations = new ContextConfigurations();

Indexer indexer = new CassandraIndexer(session, 86400, mockCache, registry, true, new SimpleResourceIdSplitter(), contextConfigurations);
CassandraIndexingOptions options = new CassandraIndexingOptions.Builder()
.withHierarchicalIndexing(true).build();

Indexer indexer = new CassandraIndexer(session, 86400, mockCache, registry, options, new SimpleResourceIdSplitter(), contextConfigurations);

indexer.update(samples);

Expand Down Expand Up @@ -251,7 +260,10 @@ public void testCache() {
MetricRegistry registry = new MetricRegistry();
ContextConfigurations contextConfigurations = new ContextConfigurations();

Indexer indexer = new CassandraIndexer(newtsInstance.getCassandraSession(), 86400, cache, registry, false, new SimpleResourceIdSplitter(), contextConfigurations);
CassandraIndexingOptions options = new CassandraIndexingOptions.Builder()
.withHierarchicalIndexing(false).build();

Indexer indexer = new CassandraIndexer(newtsInstance.getCassandraSession(), 86400, cache, registry, options, new SimpleResourceIdSplitter(), contextConfigurations);

Sample s = sampleFor(new Resource("aaa", Optional.of(map("beverage", "beer"))), "m0");
indexer.update(Collections.singletonList(s));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2016, The OpenNMS Group
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opennms.newts.cassandra.search;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.List;

import org.junit.Test;
import org.opennms.newts.api.Counter;
import org.opennms.newts.api.MetricType;
import org.opennms.newts.api.Resource;
import org.opennms.newts.api.Sample;
import org.opennms.newts.api.Timestamp;
import org.opennms.newts.cassandra.CassandraSession;
import org.opennms.newts.cassandra.ContextConfigurations;

import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import com.google.common.collect.Lists;

public class CassandraIndexerStressITCase {

@Test
public void canIndexManyResources() {
final int numResources = 20000;
final int numSamplesPerResource = 3;

// Setup the indexer
ResultSetFuture future = mock(ResultSetFuture.class);
CassandraSession session = mock(CassandraSession.class);
when(session.executeAsync(any(Statement.class))).thenReturn(future);

ContextConfigurations contexts = new ContextConfigurations();
MetricRegistry metrics = new MetricRegistry();

CassandraIndexingOptions options = new CassandraIndexingOptions.Builder()
.withHierarchicalIndexing(true).build();

ResourceIdSplitter resourceIdSplitter = new EscapableResourceIdSplitter();
GuavaResourceMetadataCache cache = new GuavaResourceMetadataCache(numResources * 2, metrics);
CassandraIndexer indexer = new CassandraIndexer(session, 0, cache, metrics, options,
resourceIdSplitter, contexts);

// Generate the resources and sample sets
Resource resources[] = new Resource[numResources];
List<List<Sample>> sampleSets = Lists.newArrayListWithCapacity(numResources);
System.out.println("Building sample sets...");
for (int i = 0; i < numResources; i++) {
resources[i] = new Resource(String.format("snmp:%d:eth0-x:ifHcInOctets", i));
List<Sample> samples = Lists.newArrayListWithCapacity(numSamplesPerResource);
for (int j = 0; j < numSamplesPerResource; j++) {
samples.add(new Sample(Timestamp.now(), resources[i], "y" + j, MetricType.COUNTER, new Counter(i * j)));
}
sampleSets.add(samples);
};
System.out.println("Done building sample sets.");

// Index the resources and associated samples several times over
for (int k = 0; k < 3; k++) {
System.out.println("Indexing samples sets...");
long start = System.currentTimeMillis();
for (List<Sample> sampleSet : sampleSets) {
indexer.update(sampleSet);
}
long elapsed = System.currentTimeMillis() - start;
System.out.println("Done indexing samples in : " + elapsed + " ms");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.opennms.newts.cassandra.ContextConfigurations;

import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import com.google.common.collect.Lists;
Expand All @@ -42,9 +45,20 @@ public void insertStatementsAreDeduplicatedWhenIndexingManySamples() {
ArgumentCaptor<Statement> statementCaptor = ArgumentCaptor.forClass(Statement.class);
when(session.executeAsync(statementCaptor.capture())).thenReturn(mock(ResultSetFuture.class));

PreparedStatement statement = mock(PreparedStatement.class);
BoundStatement boundStatement = mock(BoundStatement.class);
when(session.prepare(any(RegularStatement.class))).thenReturn(statement);
when(statement.bind()).thenReturn(boundStatement);
when(boundStatement.setString(any(String.class), any(String.class))).thenReturn(boundStatement);

CassandraIndexingOptions options = new CassandraIndexingOptions.Builder()
.withHierarchicalIndexing(true)
// Limit the batch size so we can accurately count the number of statements
.withMaxBatchSize(1).build();

MetricRegistry registry = new MetricRegistry();
GuavaResourceMetadataCache cache = new GuavaResourceMetadataCache(2048, registry);
CassandraIndexer indexer = new CassandraIndexer(session, 0, cache, registry, true,
CassandraIndexer indexer = new CassandraIndexer(session, 0, cache, registry, options,
new EscapableResourceIdSplitter(), new ContextConfigurations());

Resource r = new Resource("snmp:1589:vmware5Cpu:2:vmware5Cpu");
Expand All @@ -64,8 +78,7 @@ public void insertStatementsAreDeduplicatedWhenIndexingManySamples() {
// Index the collection of samples
indexer.update(samples);

// Verify that exectuteAsync was called exactly twice. The statements are executed
// in bounded batches, so this give us an upper limit on the number of inserts.
verify(session, times(2)).executeAsync(any(Statement.class));
// Verify the number of exectuteAsync calls
verify(session, times(20)).executeAsync(any(Statement.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opennms.newts.cassandra.ContextConfigurations;
import org.opennms.newts.cassandra.search.CassandraIndexer;
import org.opennms.newts.cassandra.search.CassandraIndexerSampleProcessor;
import org.opennms.newts.cassandra.search.CassandraIndexingOptions;
import org.opennms.newts.cassandra.search.EscapableResourceIdSplitter;
import org.opennms.newts.cassandra.search.GuavaResourceMetadataCache;
import org.opennms.newts.cassandra.search.ResourceIdSplitter;
Expand Down Expand Up @@ -81,8 +82,11 @@ class InsertDispatcher extends Dispatcher {
if (m_config.isSearchEnabled()) {
ResourceIdSplitter resourceIdSplitter = new EscapableResourceIdSplitter();
GuavaResourceMetadataCache cache = new GuavaResourceMetadataCache(m_config.getNumResources(), metrics);
CassandraIndexingOptions indexingOptions = new CassandraIndexingOptions.Builder()
.withHierarchicalIndexing(m_config.isHierarchicalIndexingEnabled())
.withMaxBatchSize(m_config.getBatchSize()).build();
CassandraIndexer cassandraIndexer = new CassandraIndexer(session, Config.CASSANDRA_TTL,
cache, metrics, m_config.isHierarchicalIndexingEnabled(), resourceIdSplitter, contexts);
cache, metrics, indexingOptions, resourceIdSplitter, contexts);
CassandraIndexerSampleProcessor indexerSampleProcessor = new CassandraIndexerSampleProcessor(cassandraIndexer);
processors.add(indexerSampleProcessor);
}
Expand Down

0 comments on commit b6eebfd

Please sign in to comment.