Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accumulo Storage Backend #377

Open
wants to merge 72 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
f34200b
Inital copy of titan-hbase to titan-accumulo.
Jun 5, 2013
62b4726
Skeleton accumulo implementation, fails all tests.
Jun 5, 2013
14ed3fb
First version titan accumulo back-end.
Jun 7, 2013
03766e0
Hard-wired local accumulo instance, passes all unit tests except Accu…
Jun 10, 2013
5a3609f
Passes all unit tests except AccumuloGraphConcurrentTest with hard-wi…
Jun 12, 2013
7e1473a
Removed extraneous configuration parameters from test harness.
Jun 12, 2013
6b905bf
Minor clean up.
Jun 12, 2013
a345911
Changed Lucene dependency to 4.2.1.
Jun 13, 2013
015ed2a
Fixed HOSTNAME in Accumulo back-end configuration.
Jun 14, 2013
1c2a297
Add titan-accumulo, titan-lucene to Titan standalone.
Jun 14, 2013
c70055f
Use Key ranges for query slices.
Jun 14, 2013
5bde1f2
Remove HBase test resources.
Jun 17, 2013
5855507
Add GraphOfTheGods example as Accumulo/Lucene graph.
Jun 17, 2013
dc12719
Remove unused Bytes class.
Jun 17, 2013
60282bc
Configured unit test for mock Accumulo.
Jun 20, 2013
1788d5b
Storage configuration persisted in Accumulo table.
Jun 21, 2013
a10e476
Light cleanup.
Jun 21, 2013
64940d1
Cleanup package imports.
Jun 24, 2013
1aad34c
Cleaned up GraphOfTheGods example.
Jun 28, 2013
071998b
Removed redundant host name configuration in graph open.
Jul 9, 2013
ccdcdbd
Removed unused properties member in AccumuloStoreManager.
Jul 28, 2013
0634ef0
Merge remote-tracking branch 'aurelius/master' into titan-accumulo
Sep 11, 2013
ceb8457
Implemented new storage API methods, passes unit tests.
Sep 17, 2013
8120f13
Refactored unit tests to use Accumulo instance injector in store mana…
Sep 17, 2013
1faf6c3
Add Accumulo instance injector interface.
Sep 17, 2013
ed1f30a
Refactored Accumulo store manager and value store, removed cleanup co…
Sep 17, 2013
1481288
Refactor getSlice in AccumuloKeyColumnValueStore.
Sep 18, 2013
93e0e42
Refactored getSlice in AccumuloKeyColumnValueStore to use executor se…
Sep 18, 2013
bdad550
Code clean up and documentation. Passes all unit tests except concurr…
Sep 20, 2013
c4df965
Restore mock Accumulo instance for unit tests in AccumuloStorageConfi…
Sep 20, 2013
9d8fe53
Merge remote-tracking branch 'aurelius/master' into titan-accumulo
Sep 20, 2013
3ffc740
Added titan-accumulo to titan-dist.
Sep 22, 2013
de661b2
Change executor service to immediate shut down in ConcurrentLists.
Sep 24, 2013
e7998d8
Expose AccumuloKeyColumnValueStore.getColumnSliceIterator(SliceQuery)…
Sep 26, 2013
ff8cd2e
Merge branch 'master' of https://github.com/thinkaurelius/titan into …
Oct 10, 2013
623f9ca
Merge branch 'master' of https://github.com/thinkaurelius/titan into …
Oct 11, 2013
09785f1
Update Accumulo storage backend to current Titan master 0.4.0 snapsho…
Oct 11, 2013
9adaed6
Fixed scanner range in store getKeys, unit tests now pass.
Oct 11, 2013
d3d18b2
Added titan-accumulo to jre7 profile.
Oct 11, 2013
6a55b41
Merge branch 'titan-0.4.0' into titan-accumulo
Oct 18, 2013
7657490
Update build files for Titan 0.4.0 release.
Oct 18, 2013
6241848
Merge branch 'master' into titan-accumulo
Oct 30, 2013
97cc600
Merge branch 'master' into titan-accumulo
Nov 1, 2013
cb13941
Updated version to 0.4.1-SNAPSHOT.
Nov 1, 2013
4ea0e01
Updated Titan-Dist-Accumulo to version 0.4.1-SNAPSHOT.
Nov 1, 2013
e5cf219
Merge branch 'master' into titan-accumulo
Nov 4, 2013
e6ccabe
Added supportsMultiQuery = true to Accumulo features.
Nov 4, 2013
d28b092
Merge branch 'master' into titan-accumulo
Nov 5, 2013
d35dc6e
Merge branch 'master' into titan-accumulo
Nov 12, 2013
cb5fd0b
Added Accumulo ID allocation test.
Nov 12, 2013
cb23da5
Accumulo username/password configuration moved to DistributedStoreMan…
Nov 12, 2013
6f9daec
Merge branch 'master' into titan-accumulo
Nov 19, 2013
61fbe54
Merge commit '2c010b7524a139c5e30fe9e9245b9bfa87fde059' into titan-ac…
Dec 2, 2013
9bda6f8
Move up Titan Accumulo version to 0.4.1 tag.
Dec 2, 2013
8899f92
Merge branch 'master' into titan-accumulo
Dec 2, 2013
355c62d
Move up Titan Accumulo version to 0.4.2-SNAPSHOT.
Dec 2, 2013
fbce20b
Added titan-test module as test dependency for titan-accumulo-iterators.
Dec 3, 2013
caed049
Merge branch 'titan-accumulo-0.4.1' into titan-accumulo
Dec 3, 2013
80c8ce0
Merge branch 'master' into titan-accumulo
Dec 16, 2013
fd0164a
Merge branch 'master' into titan-accumulo
Dec 19, 2013
3e68e7b
Merge branch 'master' into titan-accumulo
Dec 21, 2013
3e4d21e
Merge branch 'master' into titan-accumulo
Dec 24, 2013
8bc92e6
Corrected Accumulo Titan distro project so it mimics HBase distro.
Jan 6, 2014
1cdc8a6
Update Accumulo to version 1.4.4.
Jan 6, 2014
ba00005
Force Titan Accumulo distro to use Thrift 0.6.1.
Jan 6, 2014
fb30f19
Merge branch 'titan-accumulo-0.4.1' into titan-accumulo
Jan 6, 2014
367d924
Merge commit '7df580ea559c79de0adcf71089b667ff6e2ae9e9' into titan-ac…
Jan 13, 2014
786c197
Move Titan version to 0.4.2.
Jan 13, 2014
2878f6b
Shorten Accumulo configuration namespace key.
Jan 15, 2014
8b54209
Merge branch 'titan-accumulo-0.4.2' into titan-accumulo
Jan 15, 2014
0d52b78
Merge branch 'master' into titan-accumulo
Jan 15, 2014
11a1abe
Move up Titan Accumulo version to 0.4.3-SNAPSHOT.
Jan 15, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
<module>titan-berkeleyje</module>
<module>titan-cassandra</module>
<module>titan-hbase</module>
<module>titan-accumulo</module>
<module>titan-es</module>
<module>titan-lucene</module>
<module>titan-persistit</module>
Expand Down
23 changes: 23 additions & 0 deletions titan-accumulo/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>titan</artifactId>
<version>0.4.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>titan-accumulo</artifactId>
<packaging>pom</packaging>
<name>Titan-Accumulo: Distributed Graph Database</name>
<url>http://thinkaurelius.github.com/titan/</url>

<properties>
<accumulo.version>1.4.4</accumulo.version>
</properties>

<modules>
<module>titan-accumulo-iterators</module>
<module>titan-accumulo-core</module>
</modules>
</project>
95 changes: 95 additions & 0 deletions titan-accumulo/titan-accumulo-core/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>titan-accumulo</artifactId>
<version>0.4.3-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>titan-accumulo-core</artifactId>
<name>Titan-Accumulo: Graph Database Core</name>
<dependencies>
<dependency>
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>titan-accumulo-iterators</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>titan-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>titan-test</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>${accumulo.version}</version>
<exclusions>
<exclusion>
<artifactId>libthrift</artifactId>
<groupId>org.apache.thrift</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-start</artifactId>
<version>${accumulo.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging-api</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.6.1</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>${basedir}/src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>test-compile</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<excludeGroupIds>com.thinkaurelius.titan</excludeGroupIds>
<outputDirectory>target/test-lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
package com.thinkaurelius.titan.diskstorage.accumulo;

import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.BatchDeleter;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.security.Authorizations;

/**
* Configure Accumulo 1.4.3 batch scanners, writers and deleters.
*
* Adapted from org.apache.accumulo.core.client.BatchWriterConfig.
*
* @author Etienne Deprit <[email protected]>
*/
public class AccumuloBatchConfiguration {

private static final Integer DEFAULT_NUM_QUERY_THREADS = 3;
private Integer numQueryThreads = null;
private static final Long DEFAULT_MAX_MEMORY = 50 * 1024 * 1024l;
private Long maxMemory = null;
private static final Long DEFAULT_MAX_LATENCY = 2 * 60 * 1000l;
private Long maxLatency = null;
private static final Long DEFAULT_TIMEOUT = Long.MAX_VALUE;
private Long timeout = null;
private static final Integer DEFAULT_MAX_WRITE_THREADS = 3;
private Integer maxWriteThreads = null;

/**
* Sets the number of threads to spawn for querying tablet servers.
*
* <p>
* <b>Default:</b> 3
*
* @param numQueryThreads the number threads to use
* @throws IllegalArgumentException if {@code maxWriteThreads} is
* non-positive
* @return {@code this} to allow chaining of set methods
*/
public AccumuloBatchConfiguration setNumQueryThreads(int numQueryThreads) {
if (numQueryThreads <= 0) {
throw new IllegalArgumentException("Num threads must be positive " + numQueryThreads);
}

this.numQueryThreads = numQueryThreads;
return this;
}

/**
* Sets the maximum memory to batch before writing. The smaller this value,
* the more frequently the {@link BatchWriter} will write.<br />
* If set to a value smaller than a single mutation, then it will
* {@link BatchWriter#flush()} after each added mutation. Must be
* non-negative.
*
* <p>
* <b>Default:</b> 50M
*
* @param maxMemory max size in bytes
* @throws IllegalArgumentException if {@code maxMemory} is less than 0
* @return {@code this} to allow chaining of set methods
*/
public AccumuloBatchConfiguration setMaxMemory(long maxMemory) {
if (maxMemory < 0) {
throw new IllegalArgumentException("Max memory must be non-negative.");
}
this.maxMemory = maxMemory;
return this;
}

/**
* Sets the maximum amount of time to hold the data in memory before
* flushing it to servers.<br />
* For no maximum, set to zero, or {@link Long#MAX_VALUE} with
* {@link TimeUnit#MILLISECONDS}.
*
* <p> {@link TimeUnit#MICROSECONDS} or {@link TimeUnit#NANOSECONDS} will be
* truncated to the nearest {@link TimeUnit#MILLISECONDS}.<br />
* If this truncation would result in making the value zero when it was
* specified as non-zero, then a minimum value of one
* {@link TimeUnit#MILLISECONDS} will be used.
*
* <p>
* <b>Default:</b> 120 seconds
*
* @param maxLatency the maximum latency, in the unit specified by the value
* of {@code timeUnit}
* @param timeUnit determines how {@code maxLatency} will be interpreted
* @throws IllegalArgumentException if {@code maxLatency} is less than 0
* @return {@code this} to allow chaining of set methods
*/
public AccumuloBatchConfiguration setMaxLatency(long maxLatency, TimeUnit timeUnit) {
if (maxLatency < 0) {
throw new IllegalArgumentException("Negative max latency not allowed " + maxLatency);
}

if (maxLatency == 0) {
this.maxLatency = Long.MAX_VALUE;
} else { // make small, positive values that truncate to 0 when converted use the minimum millis instead
this.maxLatency = Math.max(1, timeUnit.toMillis(maxLatency));
}
return this;
}

/**
* Sets the maximum amount of time an unresponsive server will be re-tried.
* When this timeout is exceeded, the {@link BatchWriter} should throw an
* exception.<br />
* For no timeout, set to zero, or {@link Long#MAX_VALUE} with
* {@link TimeUnit#MILLISECONDS}.
*
* <p> {@link TimeUnit#MICROSECONDS} or {@link TimeUnit#NANOSECONDS} will be
* truncated to the nearest {@link TimeUnit#MILLISECONDS}.<br />
* If this truncation would result in making the value zero when it was
* specified as non-zero, then a minimum value of one
* {@link TimeUnit#MILLISECONDS} will be used.
*
* <p>
* <b>Default:</b> {@link Long#MAX_VALUE} (no timeout)
*
* @param timeout the timeout, in the unit specified by the value of
* {@code timeUnit}
* @param timeUnit determines how {@code timeout} will be interpreted
* @throws IllegalArgumentException if {@code timeout} is less than 0
* @return {@code this} to allow chaining of set methods
*/
public AccumuloBatchConfiguration setTimeout(long timeout, TimeUnit timeUnit) {
if (timeout < 0) {
throw new IllegalArgumentException("Negative timeout not allowed " + timeout);
}

if (timeout == 0) {
this.timeout = Long.MAX_VALUE;
} else { // make small, positive values that truncate to 0 when converted use the minimum millis instead
this.timeout = Math.max(1, timeUnit.toMillis(timeout));
}
return this;
}

/**
* Sets the maximum number of threads to use for writing data to the tablet
* servers.
*
* <p>
* <b>Default:</b> 3
*
* @param maxWriteThreads the maximum threads to use
* @throws IllegalArgumentException if {@code maxWriteThreads} is
* non-positive
* @return {@code this} to allow chaining of set methods
*/
public AccumuloBatchConfiguration setMaxWriteThreads(int maxWriteThreads) {
if (maxWriteThreads <= 0) {
throw new IllegalArgumentException("Max threads must be positive " + maxWriteThreads);
}

this.maxWriteThreads = maxWriteThreads;
return this;
}

/**
* Get number of threads to spawn for querying on tablet servers.
*
* @return number of threads
*/
public int getNumQueryThreads() {
return numQueryThreads != null ? numQueryThreads : DEFAULT_NUM_QUERY_THREADS;
}

/**
* Sets the maximum memory to batch before writing.
*
* @return max memory
*/
public long getMaxMemory() {
return maxMemory != null ? maxMemory : DEFAULT_MAX_MEMORY;
}

/**
* Sets the maximum amount of time to hold the data in memory before flushing it to servers.
*
* @param timeUnit units for return value
* @return max latency
*/
public long getMaxLatency(TimeUnit timeUnit) {
return timeUnit.convert(maxLatency != null ? maxLatency : DEFAULT_MAX_LATENCY, TimeUnit.MILLISECONDS);
}

/**
* Get the maximum amount of time an unresponsive server will be re-tried.
*
* @param timeUnit units for return value
* @return max timeout
*/
public long getTimeout(TimeUnit timeUnit) {
return timeUnit.convert(timeout != null ? timeout : DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
}

/**
* Get the maximum number of threads to use for writing data to the tablet servers.
*
* @return max threads
*/
public int getMaxWriteThreads() {
return maxWriteThreads != null ? maxWriteThreads : DEFAULT_MAX_WRITE_THREADS;
}

/**
* Factory method to create a BatchDeleter connected to Accumulo.
*
* @param connector connection to Accumulo
* @param tableName the name of the table to query and delete from
* @param authorizations set of authorization labels that will be checked against the column visibility.
* @return BatchDeleter object for configuring and deleting
* @throws TableNotFoundException when the specified table doesn't exist
*/
public BatchDeleter createBatchDeleter(Connector connector, String tableName, Authorizations authorizations) throws TableNotFoundException {
return connector.createBatchDeleter(tableName, authorizations,
getNumQueryThreads(), getMaxMemory(), getMaxLatency(TimeUnit.MILLISECONDS), getMaxWriteThreads());
}

/**
* Factory method to create a BatchScanner connected to Accumulo.
*
* @param connector connection to Accumulo
* @param tableName the name of the table to query and delete from
* @param authorizations set of authorization labels that will be checked against the column visibility.
* @return BatchScanner object for configuring and querying
* @throws TableNotFoundException when the specified table doesn't exist
*/
public BatchScanner createBatchScanner(Connector connector, String tableName, Authorizations authorizations) throws TableNotFoundException {
return connector.createBatchScanner(tableName, authorizations, getNumQueryThreads());
}

/**
* Factory method to create a BatchWriter connected to Accumulo.
*
* @param connector connection to Accumulo
* @param tableName the name of the table to insert data into
* @return BatchWriter object for configuring and writing data
* @throws TableNotFoundException when the specified table doesn't exist
*/
public BatchWriter createBatchWriter(Connector connector, String tableName) throws TableNotFoundException {
return connector.createBatchWriter(tableName,
getMaxMemory(), getMaxLatency(TimeUnit.MILLISECONDS), getMaxWriteThreads());
}
}
Loading