Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Merge pull request #103 from bsura/2.0
Browse files Browse the repository at this point in the history
Expose more Asynchbase properties, refactor AsynchbaseSchemaService, …
  • Loading branch information
dilipdevaraj-sfdc authored Aug 23, 2016
2 parents 1952371 + ed4365b commit 3de47ed
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@

import com.salesforce.dva.argus.service.CollectionService;
import com.salesforce.dva.argus.service.MonitorService;
import com.salesforce.dva.argus.service.MonitorService.Counter;

import java.text.MessageFormat;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -73,6 +75,7 @@ public void run() {

if (count > 0) {
LOGGER.info(MessageFormat.format("Committed {0} metrics for schema records creation.", count));
monitorService.modifyCounter(Counter.SCHEMACOMMIT_CLIENT_METRIC_WRITES, count, new HashMap<String,String>());
jobCounter.incrementAndGet();
}
Thread.sleep(POLL_INTERVAL_MS);
Expand All @@ -84,8 +87,9 @@ public void run() {
LOGGER.info("Error occured while committing metrics for schema records creation.", ex);
}
}
LOGGER.warn("Ending Schema Committer.");
LOGGER.warn(MessageFormat.format("Schema committer thread interrupted. {} metrics committed by this thread.", jobCounter.get()));
collectionService.dispose();
monitorService.dispose();
}
}
/* Copyright (c) 2016, Salesforce.com, Inc. All rights reserved. */
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ public static enum Counter {
DATAPOINT_WRITES("argus.core", "datapoint.writes"),
UNIQUE_USERS("argus.core", "users.unique"),
COMMIT_CLIENT_DATAPOINT_WRITES("argus.core", "commit.client.datapoint.writes"),
COMMIT_CLIENT_METRIC_WRITES("argus.core", "commit.client.metric.writes");
COMMIT_CLIENT_METRIC_WRITES("argus.core", "commit.client.metric.writes"),
SCHEMACOMMIT_CLIENT_METRIC_WRITES("argus.core", "schemacommit.client.metric.writes");

private final String _scope;
private final String _metric;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.salesforce.dva.argus.system.SystemException;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;

import org.apache.hadoop.hbase.util.Bytes;
import org.hbase.async.CompareFilter.CompareOp;
import org.hbase.async.Config;
Expand All @@ -58,6 +59,7 @@
import org.hbase.async.ScanFilter;
import org.hbase.async.Scanner;
import org.slf4j.Logger;

import java.nio.charset.Charset;
import java.text.MessageFormat;
import java.util.ArrayList;
Expand All @@ -66,6 +68,8 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* HBASE implementation of the schema service.
Expand All @@ -77,8 +81,8 @@ public class AsyncHbaseSchemaService extends DefaultService implements SchemaSer

//~ Static fields/initializers *******************************************************************************************************************

private static final byte[] METRIC_SCHEMA = "metric-schema".getBytes(Charset.forName("UTF-8"));
private static final byte[] SCOPE_SCHEMA = "scope-schema".getBytes(Charset.forName("UTF-8"));
private static final String METRIC_SCHEMA_TABLENAME = "metric-schema";
private static final String SCOPE_SCHEMA_TABLENAME = "scope-schema";
private static final byte[] COLUMN_FAMILY = "f".getBytes(Charset.forName("UTF-8"));
private static final byte[] COLUMN_QUALIFIER = "c".getBytes(Charset.forName("UTF-8"));
private static final byte[] CELL_VALUE = "1".getBytes(Charset.forName("UTF-8"));
Expand All @@ -89,17 +93,31 @@ public class AsyncHbaseSchemaService extends DefaultService implements SchemaSer

@SLF4JTypeListener.InjectLogger
private Logger _logger;
private HBaseClient _client;
private final HBaseClient _client;
private final boolean _syncPut;

//~ Constructors *********************************************************************************************************************************

@Inject
private AsyncHbaseSchemaService(SystemConfiguration systemConfig) {
super(systemConfig);
Config config = new Config();

config.overrideConfig("hbase.zookeeper.quorum",
systemConfig.getValue(Property.HBASE_ZOOKEEPER_CONNECT.getName(), Property.HBASE_ZOOKEEPER_CONNECT.getDefaultValue()));

_syncPut = Boolean.getBoolean(systemConfig.getValue(Property.HBASE_SYNC_PUT.getName(), Property.HBASE_SYNC_PUT.getDefaultValue()));

Config config = new Config();

config.overrideConfig("hbase.zookeeper.quorum",
systemConfig.getValue(Property.HBASE_ZOOKEEPER_CONNECT.getName(), Property.HBASE_ZOOKEEPER_CONNECT.getDefaultValue()));
config.overrideConfig("hbase.zookeeper.session.timeout",
systemConfig.getValue(Property.HBASE_ZOOKEEPER_SESSION_TIMEOUT.getName(), Property.HBASE_ZOOKEEPER_SESSION_TIMEOUT.getDefaultValue()));

config.overrideConfig("hbase.rpcs.batch.size",
systemConfig.getValue(Property.HBASE_RPCS_BATCH_SIZE.getName(), Property.HBASE_RPCS_BATCH_SIZE.getDefaultValue()));
config.overrideConfig("hbase.rpcs.buffered_flush_interval",
systemConfig.getValue(Property.HBASE_RPCS_BUFFERED_FLUSH_INTERVAL.getName(), Property.HBASE_RPCS_BUFFERED_FLUSH_INTERVAL.getDefaultValue()));
config.overrideConfig("hbase.rpc.timeout",
systemConfig.getValue(Property.HBASE_RPC_TIMEOUT.getName(), Property.HBASE_RPC_TIMEOUT.getDefaultValue()));

config.overrideConfig("hbase.security.auth.enable",
systemConfig.getValue(Property.HBASE_SECURITY_AUTH_ENABLE.getName(), Property.HBASE_SECURITY_AUTH_ENABLE.getDefaultValue()));
config.overrideConfig("hbase.rpc.protection",
Expand All @@ -110,10 +128,11 @@ private AsyncHbaseSchemaService(SystemConfiguration systemConfig) {
systemConfig.getValue(Property.HBASE_KERBEROS_REGIONSERVER_PRINCIPAL.getName(), Property.HBASE_KERBEROS_REGIONSERVER_PRINCIPAL.getDefaultValue()));
config.overrideConfig("hbase.security.authentication",
systemConfig.getValue(Property.HBASE_SECURITY_AUTHENTICATION.getName(), Property.HBASE_SECURITY_AUTHENTICATION.getDefaultValue()));
config.overrideConfig("hbase.rpcs.batch.size", "16192");
config.overrideConfig("hbase.rpcs.buffered_flush_interval", "5000");
config.overrideConfig("hbase.zookeeper.session.timeout", "6000");

_client = new HBaseClient(config);

_ensureTableWithColumnFamilyExists(Bytes.toBytes(SCOPE_SCHEMA_TABLENAME), COLUMN_FAMILY);
_ensureTableWithColumnFamilyExists(Bytes.toBytes(METRIC_SCHEMA_TABLENAME), COLUMN_FAMILY);
}

//~ Methods **************************************************************************************************************************************
Expand Down Expand Up @@ -241,68 +260,18 @@ public void put(Metric metric) {
public void put(List<Metric> metrics) {
requireNotDisposed();
SystemAssert.requireArgument(metrics != null && !metrics.isEmpty(), "Metric list cannot be null or empty.");

for (Metric metric : metrics) {
if (metric.getTags().isEmpty()) {
String rowKeyStr = _constructRowKey(metric.getNamespace(), metric.getScope(), metric.getMetric(), null, null, TableType.SCOPE);

_logger.trace(MessageFormat.format("Inserting row key {0} into table scope-schema", rowKeyStr));

final PutRequest scopePut = new PutRequest(SCOPE_SCHEMA, Bytes.toBytes(rowKeyStr), COLUMN_FAMILY, COLUMN_QUALIFIER, CELL_VALUE);
Deferred<Object> scopePutDeferred = _client.put(scopePut);

scopePutDeferred.addErrback(new Callback<Object, Exception>() {

@Override
public Object call(Exception e) throws Exception {
throw new SystemException("Error occured while trying to execute put() on scope schema table.", e);
}
});
rowKeyStr = _constructRowKey(metric.getNamespace(), metric.getScope(), metric.getMetric(), null, null, TableType.METRIC);
_logger.trace(MessageFormat.format("Inserting row key {0} into table metric-schema", rowKeyStr));

final PutRequest metricPut = new PutRequest(METRIC_SCHEMA, Bytes.toBytes(rowKeyStr), COLUMN_FAMILY, COLUMN_QUALIFIER, CELL_VALUE);
Deferred<Object> metricPutDeferred = _client.put(metricPut);

metricPutDeferred.addErrback(new Callback<Object, Exception>() {

@Override
public Object call(Exception e) throws Exception {
throw new SystemException("Error occured while trying to execute put() on metric schema table.", e);
}
});
_putWithoutTag(metric, SCOPE_SCHEMA_TABLENAME);
_putWithoutTag(metric, METRIC_SCHEMA_TABLENAME);
}

for (Entry<String, String> tag : metric.getTags().entrySet()) {
String rowKeyStr = _constructRowKey(metric.getNamespace(), metric.getScope(), metric.getMetric(), tag.getKey(), tag.getValue(),
TableType.SCOPE);

_logger.trace(MessageFormat.format("Inserting row key {0} into table scope-schema", rowKeyStr));

final PutRequest scopePut = new PutRequest(SCOPE_SCHEMA, Bytes.toBytes(rowKeyStr), COLUMN_FAMILY, COLUMN_QUALIFIER, CELL_VALUE);
Deferred<Object> scopePutDeferred = _client.put(scopePut);

scopePutDeferred.addErrback(new Callback<Object, Exception>() {

@Override
public Object call(Exception e) throws Exception {
throw new SystemException("Error occured while trying to execute put() on scope schema table.", e);
}
});
rowKeyStr = _constructRowKey(metric.getNamespace(), metric.getScope(), metric.getMetric(), tag.getKey(), tag.getValue(),
TableType.METRIC);
_logger.trace(MessageFormat.format("Inserting row key {0} into table metric-schema", rowKeyStr));

final PutRequest metricPut = new PutRequest(METRIC_SCHEMA, Bytes.toBytes(rowKeyStr), COLUMN_FAMILY, COLUMN_QUALIFIER, CELL_VALUE);
Deferred<Object> metricPutDeferred = _client.put(metricPut);

metricPutDeferred.addErrback(new Callback<Object, Exception>() {

@Override
public Object call(Exception e) throws Exception {
throw new SystemException("Error occured while trying to execute put() on metric schema table.", e);
}
});
}
} // end for
_putWithTag(metric, tag, SCOPE_SCHEMA_TABLENAME);
_putWithTag(metric, tag, METRIC_SCHEMA_TABLENAME);
}
}
}

@Override
Expand Down Expand Up @@ -357,17 +326,21 @@ final class ScannerCB implements Callback<Object, ArrayList<ArrayList<KeyValue>>
* @return The list of metric schema records.
*/
public Object scan() {
_logger.debug("Getting next set of rows.");
return scanner.nextRows().addCallback(this);
}

@Override
public Object call(ArrayList<ArrayList<KeyValue>> rows) throws Exception {
_logger.debug("Inside nextRows() callback..");
try {
if (rows == null) {
results.callback(records);
scanner.close();
return null;
}

_logger.debug("Retrieved " + rows.size() + " rows.");
if (recordsToSkip >= rows.size()) {
recordsToSkip -= rows.size();
} else {
Expand Down Expand Up @@ -528,7 +501,79 @@ public Void call(Exception arg) throws Exception {
}
}
}


private void _ensureTableWithColumnFamilyExists(byte[] table, byte[] family) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean fail = new AtomicBoolean(false);
_client.ensureTableFamilyExists(table, family).addCallbacks(
new Callback<Object, Object>() {

@Override
public Object call(Object arg) throws Exception {
latch.countDown();
return null;
}

},
new Callback<Object, Object>() {

@Override
public Object call(Object arg) throws Exception {
fail.set(true);
latch.countDown();
return null;
}
}
);

try {
latch.await();
} catch (InterruptedException e) {
throw new SystemException("Interrupted", e);
}

if(fail.get()) {
throw new SystemException("Table or Column Family doesn't exist.");
}
}

private void _putWithoutTag(Metric metric, String tableName) {
String rowKeyStr = _constructRowKey(metric.getNamespace(), metric.getScope(), metric.getMetric(),
null, null, TableType.fromTableName(tableName));
_put(tableName, rowKeyStr);
}

private void _putWithTag(Metric metric, Entry<String, String> tag, String tableName) {
String rowKeyStr = _constructRowKey(metric.getNamespace(), metric.getScope(), metric.getMetric(), tag.getKey(),
tag.getValue(), TableType.fromTableName(tableName));
_put(tableName, rowKeyStr);
}

private void _put(String tableName, String rowKeyStr) {
_logger.trace(MessageFormat.format("Inserting rowkey {0} into table {1}", rowKeyStr, tableName));

final PutRequest put = new PutRequest(Bytes.toBytes(tableName), Bytes.toBytes(rowKeyStr), COLUMN_FAMILY, COLUMN_QUALIFIER, CELL_VALUE);
Deferred<Object> deferred = _client.put(put);

if(_syncPut) {
deferred.addCallback(new Callback<Object, Object>() {
@Override
public Object call(Object arg) throws Exception {
_logger.trace(MessageFormat.format("Put to {0} successfully.", tableName));
return null;
}
});
}

deferred.addErrback(new Callback<Object, Exception>() {
@Override
public Object call(Exception e) throws Exception {
throw new SystemException("Error occured while trying to execute put().", e);
}
});
}

private String _getValueForType(MetricSchemaRecord record, RecordType type) {
switch (type) {
case NAMESPACE:
Expand Down Expand Up @@ -637,8 +682,8 @@ private ScanMetadata _constructScanMetadata(MetricSchemaRecordQuery query) {
*/
public static enum TableType {

SCOPE("scope-schema"),
METRIC("metric-schema");
SCOPE(SCOPE_SCHEMA_TABLENAME),
METRIC(METRIC_SCHEMA_TABLENAME);

private String _tableName;

Expand Down Expand Up @@ -680,13 +725,21 @@ public String getTableName() {
* @author Tom Valine ([email protected])
*/
public enum Property {

HBASE_ZOOKEEPER_CONNECT("service.property.schema.hbase.zookeeper.connect", "hbase.zookeeper.com:1234"),
HBASE_ZOOKEEPER_SESSION_TIMEOUT("service.property.schema.hbase.zookeeper.session.timeout", "6000"),

HBASE_SECURITY_AUTHENTICATION("service.property.schema.hbase.security.authentication", ""),
HBASE_RPC_PROTECTION("service.property.schema.hbase.rpc.protection", ""),
HBASE_SASL_CLIENTCONFIG("service.property.schema.hbase.sasl.clientconfig", "Client"),
HBASE_SECURITY_AUTH_ENABLE("service.property.schema.hbase.security.auth.enable", "false"),
HBASE_KERBEROS_REGIONSERVER_PRINCIPAL("service.property.schema.hbase.kerberos.regionserver.principal", "");
HBASE_KERBEROS_REGIONSERVER_PRINCIPAL("service.property.schema.hbase.kerberos.regionserver.principal", ""),

HBASE_RPCS_BATCH_SIZE("service.property.schema.hbase.rpcs.batch.size", "16192"),
HBASE_RPCS_BUFFERED_FLUSH_INTERVAL("service.property.schema.hbase.rpcs.buffered_flush_interval", "5000"),
HBASE_RPC_TIMEOUT("service.property.schema.hbase.rpc.timeout", "0"),

HBASE_SYNC_PUT("service.property.schema.hbase.sync.put", "false");

private final String _name;
private final String _defaultValue;
Expand Down

0 comments on commit 3de47ed

Please sign in to comment.