From 30bd741bbdc99f42cbe3491398406061cee2be80 Mon Sep 17 00:00:00 2001 From: bipinprasad Date: Mon, 4 Apr 2022 21:38:52 -0700 Subject: [PATCH] [STORM-3850] Collapse exceptions, remove unused variables/params, fix spelling, etc --- .../org/apache/storm/loadgen/CaptureLoad.java | 3 +- .../storm/perf/queuetest/JCQueuePerfTest.java | 3 +- .../storm/pmml/JpmmlRunnerTestTopology.java | 24 +-- .../apache/storm/starter/LambdaTopology.java | 7 +- .../apache/storm/blobstore/MigrateBlobs.java | 18 +- ...ObjectMapperCqlStatementMapperBuilder.java | 50 +++--- .../elasticsearch/common/EsConfigTest.java | 15 +- .../storm/eventhubs/bolt/EventHubBolt.java | 40 ++--- .../hdfs/bolt/AvroGenericRecordBoltTest.java | 13 +- .../apache/storm/kafka/bolt/KafkaBolt.java | 20 +-- .../apache/storm/pmml/model/ModelOutputs.java | 2 +- .../apache/storm/solr/mapper/SolrMapper.java | 2 +- .../apache/storm/flux/parser/FluxParser.java | 36 ++-- .../org/apache/storm/blobstore/BlobStore.java | 12 +- .../storm/cluster/StormClusterStateImpl.java | 2 +- .../storm/metrics2/StormMetricRegistry.java | 2 + .../storm/security/auth/ReqContext.java | 2 +- .../serialization/SerializationFactory.java | 8 +- .../org/apache/storm/streams/PairStream.java | 5 +- .../topology/BaseConfigurationDeclarer.java | 3 +- .../storm/topology/TopologyBuilder.java | 39 +++-- .../storm/trident/operation/Aggregator.java | 2 +- .../operation/impl/ReducerAggregatorImpl.java | 2 +- .../org/apache/storm/utils/ConfigUtils.java | 14 +- .../jvm/org/apache/storm/utils/JCQueue.java | 1 + .../src/jvm/org/apache/storm/utils/Utils.java | 154 ++++++++---------- .../SimpleWindowPartitionCache.java | 5 +- .../apache/storm/command/AdminCommands.java | 8 +- .../java/org/apache/storm/LocalCluster.java | 1 + .../storm/blobstore/LocalFsBlobStore.java | 21 ++- .../apache/storm/daemon/supervisor/Slot.java | 1 + .../sorter/ExecSorterByConnectionCount.java | 1 + .../sorter/ExecSorterByProximity.java | 1 + .../scheduling/sorter/NodeSorter.java | 1 + .../sorter/NodeSorterHostProximity.java | 11 +- .../TestConstraintSolverStrategy.java | 30 ++-- 36 files changed, 272 insertions(+), 287 deletions(-) diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java index 9ac2e57ee55..e3e60e7510f 100644 --- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java +++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java @@ -271,7 +271,8 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top return new TopologyLoadConf(topologyName, savedTopoConf, spouts, bolts, streams); } - private static void addCpuMemToBuilders(Map boltBuilders, Map> boltResources) { + private static void addCpuMemToBuilders(Map boltBuilders, + Map> boltResources) { for (Map.Entry> entry: boltResources.entrySet()) { LoadCompConf.Builder bd = boltBuilders.get(entry.getKey()); if (bd != null) { diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java index 2ee34412eb7..86f8516d62a 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java @@ -44,7 +44,8 @@ public static void main(String[] args) throws Exception { //private static void ackingProducerSimulation() { // WaitStrategyPark ws = new WaitStrategyPark(100); // StormMetricRegistry registry = new StormMetricRegistry(); - // JCQueue spoutQ = new JCQueue("spoutQ", "spoutQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry); + // JCQueue spoutQ = new JCQueue("spoutQ", "spoutQ", 1024, 0, 100, ws, "test", "test", + // Collections.singletonList(1000), 1000, registry); // JCQueue ackQ = new JCQueue("ackQ", "ackQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry); // // final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ); diff --git a/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java b/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java index 95c8160f94c..a5df1f08393 100644 --- a/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java +++ b/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java @@ -65,7 +65,7 @@ public class JpmmlRunnerTestTopology { private String blobKey; // PMML Model downloaded from Blobstore - null if using File private String tplgyName = "test"; - public static void main(String[] args) throws Exception { + public static void main(String[] args) { try { JpmmlRunnerTestTopology testTopology = new JpmmlRunnerTestTopology(); testTopology.parseArgs(args); @@ -77,10 +77,10 @@ public static void main(String[] args) throws Exception { } private void parseArgs(String[] args) { - if (Arrays.stream(args).anyMatch(option -> option.equals("-h"))) { + if (Arrays.asList(args).contains("-h")) { printUsage(); - } else if (Arrays.stream(args).anyMatch(option -> option.equals("-f")) - && Arrays.stream(args).anyMatch(option -> option.equals("-b"))) { + } else if (Arrays.asList(args).contains("-f") + && Arrays.asList(args).contains("-b")) { System.out.println("Please specify only one option of [-b, -f]"); printUsage(); } else { @@ -116,12 +116,12 @@ private void parseArgs(String[] args) { private void setDefaults() { if (blobKey == null) { // blob key not specified, use file if (pmml == null) { - pmml = loadExample(pmml, PMML_MODEL_FILE); + pmml = loadExample(PMML_MODEL_FILE); } } if (rawInputs == null) { - rawInputs = loadExample(rawInputs, RAW_INPUTS_FILE); + rawInputs = loadExample(RAW_INPUTS_FILE); } if (tplgyName == null) { @@ -129,8 +129,12 @@ private void setDefaults() { } } - private File loadExample(File file, String example) { + private File loadExample(String example) { + File file; try (InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(example)) { + if (stream == null) { + throw new RuntimeException("Error loading example=" + example + ", stream is null"); + } file = File.createTempFile("pmml-example", ".tmp"); IOUtils.copy(stream, new FileOutputStream(file)); } catch (IOException e) { @@ -147,8 +151,8 @@ private static void printUsage() { } private void run() throws Exception { - System.out.println(String.format("Running topology using PMML model loaded from [%s] and raw input data loaded from [%s]", - blobKey != null ? "Blobstore with blob key [" + blobKey + "]" : pmml.getAbsolutePath(), rawInputs.getAbsolutePath())); + System.out.printf("Running topology using PMML model loaded from [%s] and raw input data loaded from [%s]%n", + blobKey != null ? "Blobstore with blob key [" + blobKey + "]" : pmml.getAbsolutePath(), rawInputs.getAbsolutePath()); submitTopologyRemoteCluster(newTopology(), newConfig()); } @@ -171,7 +175,7 @@ private Config newConfig() { return config; } - private IRichBolt newBolt() throws Exception { + private IRichBolt newBolt() { final List streams = Lists.newArrayList(Utils.DEFAULT_STREAM_ID, NON_DEFAULT_STREAM_ID); if (blobKey != null) { // Load PMML Model from Blob store final ModelOutputs outFields = JpmmlModelOutputs.toStreams(blobKey, streams); diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java index e7c134b58c6..7c800e3b69a 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -12,7 +12,6 @@ package org.apache.storm.starter; -import java.io.Serializable; import java.util.UUID; import org.apache.storm.Config; import org.apache.storm.topology.ConfigurableTopology; @@ -42,10 +41,10 @@ protected int run(String[] args) throws Exception { builder.setSpout("spout1", () -> UUID.randomUUID().toString()); builder.setBolt("bolt1", (tuple, collector) -> { - String[] parts = tuple.getStringByField("lambda").split("\\-"); + String[] parts = tuple.getStringByField("lambda").split("-"); collector.emit(new Values(prefix + parts[0] + suffix, tag)); }, "strValue", "intValue").shuffleGrouping("spout1"); - builder.setBolt("bolt2", tuple -> System.out.println(tuple)).shuffleGrouping("bolt1"); + builder.setBolt("bolt2", System.out::println).shuffleGrouping("bolt1"); Config conf = new Config(); conf.setDebug(true); diff --git a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java index e7a3581637d..a05d0ac74e4 100644 --- a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java +++ b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,11 +23,8 @@ import java.util.Map; import javax.security.auth.Subject; -import javax.security.auth.login.LoginContext; import org.apache.storm.Config; -import org.apache.storm.blobstore.BlobStore; -import org.apache.storm.blobstore.LocalFsBlobStore; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.KeyAlreadyExistsException; import org.apache.storm.generated.KeyNotFoundException; @@ -47,9 +44,8 @@ protected static void deleteAllBlobStoreKeys(BlobStore bs, Subject who) throws A } } - protected static void copyBlobStoreKeys(BlobStore bsFrom, - Subject whoFrom, - BlobStore bsTo, Subject whoTo) throws AuthorizationException, + protected static void copyBlobStoreKeys(BlobStore bsFrom, Subject whoFrom, BlobStore bsTo, Subject whoTo) + throws AuthorizationException, KeyAlreadyExistsException, IOException, KeyNotFoundException { @@ -63,13 +59,12 @@ protected static void copyBlobStoreKeys(BlobStore bsFrom, System.out.println("DONE CREATING BLOB " + key); } } - - + public static void main(String[] args) throws Exception { Map hdfsConf = Utils.readStormConfig(); if (args.length < 2) { - System.out.println("Need at least 2 arguments, but have " + Integer.toString(args.length)); + System.out.println("Need at least 2 arguments, but have " + args.length); System.out.println("migrate "); System.out.println("Migrates blobs from LocalFsBlobStore to HdfsBlobStore"); System.out.println("Example: migrate '/srv/storm' " @@ -104,8 +99,7 @@ public static void main(String[] args) throws Exception { HdfsBlobStore hdfsBlobStore = new HdfsBlobStore(); hdfsBlobStore.prepare(hdfsConf, null, null, null); - - + /* LOOK AT LOCAL BLOBSTORE */ System.out.println("Listing local blobstore keys."); MigratorMain.listBlobStoreKeys(lfsBlobStore, null); diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java index d7961d85874..eae4766b18e 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -60,29 +60,29 @@ public ObjectMapperCqlStatementMapper build() { codecs, udtClasses); } - public ObjectMapperCqlStatementMapperBuilder withCodecs(List>> codecProducer) { - this.codecProducers.addAll(codecProducer); - return this; - } - - public ObjectMapperCqlStatementMapperBuilder withUdtCodecs(List> udtClass) { - this.udtClasses.addAll(udtClass); - return this; - } - - public ObjectMapperCqlStatementMapperBuilder withTimestampField(String timestampField) { - this.timestampField = timestampField; - return this; - } - - public ObjectMapperCqlStatementMapperBuilder withTtlField(String ttlField) { - this.ttlField = ttlField; - return this; - } - - public ObjectMapperCqlStatementMapperBuilder withConsistencyLevelField(String consistencyLevelField) { - this.consistencyLevelField = consistencyLevelField; - return this; - } + //public ObjectMapperCqlStatementMapperBuilder withCodecs(List>> codecProducer) { + // this.codecProducers.addAll(codecProducer); + // return this; + //} + // + //public ObjectMapperCqlStatementMapperBuilder withUdtCodecs(List> udtClass) { + // this.udtClasses.addAll(udtClass); + // return this; + //} + // + //public ObjectMapperCqlStatementMapperBuilder withTimestampField(String timestampField) { + // this.timestampField = timestampField; + // return this; + //} + // + //public ObjectMapperCqlStatementMapperBuilder withTtlField(String ttlField) { + // this.ttlField = ttlField; + // return this; + //} + // + //public ObjectMapperCqlStatementMapperBuilder withConsistencyLevelField(String consistencyLevelField) { + // this.consistencyLevelField = consistencyLevelField; + // return this; + //} } diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java index 23f00ca5dcd..8709037f4ec 100644 --- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsConfigTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,23 +17,22 @@ */ package org.apache.storm.elasticsearch.common; -import static org.junit.Assert.assertEquals; - -import org.apache.http.HttpHost; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import com.google.common.testing.NullPointerTester; -import org.junit.jupiter.api.Assertions; +import org.apache.http.HttpHost; import org.junit.jupiter.api.Test; public class EsConfigTest { @Test - public void urlsCannotBeEmpty() throws Exception { - Assertions.assertThrows(IllegalArgumentException.class, () -> new EsConfig(new String[] {})); + public void urlsCannotBeEmpty() { + assertThrows(IllegalArgumentException.class, () -> new EsConfig(new String[] {})); } @Test - public void constructorThrowsOnNull() throws Exception { + public void constructorThrowsOnNull() { new NullPointerTester().testAllPublicConstructors(EsConfig.class); } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java index 4c1e0684333..f1b2c1e661e 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - *******************************************************************************/ + */ package org.apache.storm.eventhubs.bolt; @@ -38,23 +38,22 @@ */ public class EventHubBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory - .getLogger(EventHubBolt.class); + private static final Logger logger = LoggerFactory.getLogger(EventHubBolt.class); protected OutputCollector collector; protected PartitionSender sender; protected EventHubClient ehClient; protected EventHubBoltConfig boltConfig; - public EventHubBolt(String connectionString, String entityPath) { - boltConfig = new EventHubBoltConfig(connectionString, entityPath); - } - - public EventHubBolt(String userName, String password, String namespace, - String entityPath, boolean partitionMode) { - boltConfig = new EventHubBoltConfig(userName, password, namespace, - entityPath, partitionMode); - } + //public EventHubBolt(String connectionString, String entityPath) { + // boltConfig = new EventHubBoltConfig(connectionString, entityPath); + //} + // + //public EventHubBolt(String userName, String password, String namespace, + // String entityPath, boolean partitionMode) { + // boltConfig = new EventHubBoltConfig(userName, password, namespace, + // entityPath, partitionMode); + //} public EventHubBolt(EventHubBoltConfig config) { boltConfig = config; @@ -97,12 +96,9 @@ public void execute(Tuple tuple) { throw new EventHubException("ehclient is null"); } collector.ack(tuple); - } catch (EventHubException ex) { + } catch (EventHubException | ServiceBusException ex) { collector.reportError(ex); collector.fail(tuple); - } catch (ServiceBusException e) { - collector.reportError(e); - collector.fail(tuple); } } @@ -113,17 +109,15 @@ public void cleanup() { sender.close().whenComplete((voidargs, error) -> { try { if (error != null) { - logger.error("Exception during sender cleanup phase" + error.toString()); + logger.error("Exception during sender cleanup phase" + error); } ehClient.closeSync(); } catch (Exception e) { - logger.error("Exception during ehclient cleanup phase" + e.toString()); + logger.error("Exception during ehclient cleanup phase" + e); } }).get(); - } catch (InterruptedException e) { - logger.error("Exception occured during cleanup phase" + e.toString()); - } catch (ExecutionException e) { - logger.error("Exception occured during cleanup phase" + e.toString()); + } catch (InterruptedException | ExecutionException e) { + logger.error("Exception occurred during cleanup phase" + e); } logger.info("Eventhub Bolt cleaned up"); sender = null; diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java index a919c921e73..9d28876f48b 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -68,10 +68,9 @@ public class AvroGenericRecordBoltTest { + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + "{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" }," + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; - private static Schema schema1; - private static Schema schema2; private static Tuple tuple1; private static Tuple tuple2; + @Rule public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(() -> { Configuration conf = new Configuration(); @@ -92,10 +91,10 @@ public class AvroGenericRecordBoltTest { @BeforeClass public static void setupClass() { Schema.Parser parser = new Schema.Parser(); - schema1 = parser.parse(schemaV1); + Schema schema1 = parser.parse(schemaV1); parser = new Schema.Parser(); - schema2 = parser.parse(schemaV2); + Schema schema2 = parser.parse(schemaV2); GenericRecordBuilder builder1 = new GenericRecordBuilder(schema1); builder1.set("foo1", "bar1"); @@ -110,8 +109,8 @@ public static void setupClass() { private static Tuple generateTestTuple(GenericRecord record) { TopologyBuilder builder = new TopologyBuilder(); - GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), - new Config(), new HashMap(), new HashMap(), new HashMap(), "") { + GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), + new HashMap<>(), new HashMap<>(), new HashMap<>(), "") { @Override public Fields getComponentOutputFields(String componentId, String streamId) { return new Fields("record"); diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java index f3fc8080541..d4a1c538c14 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -70,7 +70,7 @@ public class KafkaBolt extends BaseTickTupleAwareRichBolt { */ private boolean fireAndForget = false; /** - * {@see KafkaBolt#setAsync(boolean)} for more details on this. + * {@see KafkaBolt#setAsync(boolean)} for more details on this. */ private boolean async = true; @@ -116,7 +116,7 @@ public void prepare(Map topoConf, TopologyContext context, Outpu //for backward compatibility. if (mapper == null) { LOG.info("Mapper not specified. Setting default mapper to {}", FieldNameBasedTupleToKafkaMapper.class.getSimpleName()); - this.mapper = new FieldNameBasedTupleToKafkaMapper(); + this.mapper = new FieldNameBasedTupleToKafkaMapper<>(); } //for backward compatibility. @@ -169,9 +169,9 @@ private Callback createProducerCallback(final Tuple input) { @Override protected void process(final Tuple input) { - K key = null; - V message = null; - String topic = null; + K key; + V message; + String topic; try { key = mapper.getKeyFromTuple(input); message = mapper.getMessageFromTuple(input); @@ -238,10 +238,10 @@ public void setAsync(boolean async) { @Override public String toString() { - return "KafkaBolt: {mapper: " + mapper + return "KafkaBolt: {mapper: " + mapper + " topicSelector: " + topicSelector - + " fireAndForget: " + fireAndForget - + " async: " + async - + " proerties: " + boltSpecifiedProperties; + + " fireAndForget: " + fireAndForget + + " async: " + async + + " properties: " + boltSpecifiedProperties; } } diff --git a/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/ModelOutputs.java b/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/ModelOutputs.java index 8322a8a86c0..547898eb4ef 100644 --- a/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/ModelOutputs.java +++ b/external/storm-pmml/src/main/java/org/apache/storm/pmml/model/ModelOutputs.java @@ -38,7 +38,7 @@ public interface ModelOutputs extends Serializable { /** * Convenience method that returns a set with all the streams declared by the {@link PMMLPredictorBolt}. - * By default this this method calls {@link #streamFields()}{@code .keySet()}. + * By default, this method calls {@link #streamFields()}{@code .keySet()}. * @return The set with all declared streams */ default Set streams() { diff --git a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java index 1a083a2aa97..d48b0e11169 100644 --- a/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java +++ b/external/storm-solr/src/main/java/org/apache/storm/solr/mapper/SolrMapper.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java index 50570e11d06..7f75072d788 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java @@ -56,10 +56,10 @@ private FluxParser() { * * @param inputFile source YAML file * @param dumpYaml if true, dump the parsed YAML to stdout - * @param processIncludes whether or not to process includes + * @param processIncludes whether to process includes * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution - * @return resulting topologuy definition + * @param envSub whether to perform environment variable substitution + * @return resulting topology definition * @throws IOException if there is a problem reading file(s) */ public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes, @@ -72,14 +72,14 @@ public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean } /** - * Parse a flux topology definition from a classpath resource.. + * Parse a flux topology definition from a classpath resource. * * @param resource YAML resource * @param dumpYaml if true, dump the parsed YAML to stdout - * @param processIncludes whether or not to process includes + * @param processIncludes whether to process includes * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution - * @return resulting topologuy definition + * @param envSub whether to perform environment variable substitution + * @return resulting topology definition * @throws IOException if there is a problem reading file(s) */ public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes, @@ -96,9 +96,9 @@ public static TopologyDef parseResource(String resource, boolean dumpYaml, boole * * @param inputStream InputStream representation of YAML file * @param dumpYaml if true, dump the parsed YAML to stdout - * @param processIncludes whether or not to process includes + * @param processIncludes whether to process includes * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution + * @param envSub whether to perform environment variable substitution * @return resulting topology definition * @throws IOException if there is a problem reading file(s) */ @@ -118,7 +118,7 @@ public static TopologyDef parseInputStream(InputStream inputStream, boolean dump } if (processIncludes) { - return processIncludes(yaml, topology, properties, envSub); + return processIncludes(topology, properties, envSub); } else { return topology; } @@ -128,7 +128,7 @@ public static TopologyDef parseInputStream(InputStream inputStream, boolean dump * Parse filter properties file. * * @param propertiesFile properties file for variable substitution - * @param resource whether or not to load properties file from classpath + * @param resource whether to load properties file from classpath * @return resulting filter properties * @throws IOException if there is a problem reading file */ @@ -137,7 +137,7 @@ public static Properties parseProperties(String propertiesFile, boolean resource if (propertiesFile != null) { properties = new Properties(); - InputStream in = null; + InputStream in; if (resource) { in = FluxParser.class.getResourceAsStream(propertiesFile); } else { @@ -165,7 +165,7 @@ private static TopologyDef loadYaml(Yaml yaml, InputStream in, Properties proper : line; }).collect(Collectors.joining(System.lineSeparator())); - return (TopologyDef) yaml.load(conf); + return yaml.load(conf); } } @@ -203,25 +203,23 @@ private static Yaml yaml() { Constructor constructor = new Constructor(TopologyDef.class); constructor.addTypeDescription(topologyDescription); - Yaml yaml = new Yaml(constructor); - return yaml; + return new Yaml(constructor); } /** * Process includes contained within a yaml file. * - * @param yaml the yaml parser for parsing the include file(s) * @param topologyDef the topology definition containing (possibly zero) includes * @param properties properties file for variable substitution - * @param envSub whether or not to perform environment variable substitution + * @param envSub whether to perform environment variable substitution * @return The TopologyDef with includes resolved. */ - private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, Properties properties, boolean envSub) + private static TopologyDef processIncludes(TopologyDef topologyDef, Properties properties, boolean envSub) throws IOException { //TODO support multiple levels of includes if (topologyDef.getIncludes() != null) { for (IncludeDef include : topologyDef.getIncludes()) { - TopologyDef includeTopologyDef = null; + TopologyDef includeTopologyDef; if (include.isResource()) { LOG.info("Loading includes from resource: {}", include.getFile()); includeTopologyDef = parseResource(include.getFile(), true, false, properties, envSub); diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java index 1a67f9323bf..58bcc7cbaef 100644 --- a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java +++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java @@ -149,7 +149,7 @@ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subjec * Updates the blob data. * * @param key Key for the blob - * @param who Is the subject having the write privilege for the blob + * @param who Is the subject with write privilege for the blob * @return AtomicOutputStream returns a stream into which the data can be written */ public abstract AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException; @@ -185,7 +185,7 @@ public void updateBlob(String key, byte[] data, Subject who) throws Authorizatio public abstract ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException; /** - * Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi. + * Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi). */ public abstract void setLeaderElector(ILeaderElector leaderElector); @@ -195,7 +195,7 @@ public void updateBlob(String key, byte[] data, Subject who) throws Authorizatio * * @param key Key for the blob * @param meta Metadata which contains the updated acls information - * @param who Is the subject having the write privilege for the blob + * @param who Is the subject with write privilege for the blob */ public abstract void setBlobMeta(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyNotFoundException; @@ -217,7 +217,7 @@ public void updateBlob(String key, byte[] data, Subject who) throws Authorizatio public abstract InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException; /** - * Returns an iterator with all the list of keys currently available on the blob store. + * Returns an iterator with all the list of keys currently available in the blob store. * * @return {@code Iterator} */ @@ -308,7 +308,7 @@ public byte[] readBlob(String key, Subject who) throws IOException, KeyNotFoundE /** * Get IDs stored in blob store. - * @return a set of all of the topology ids with special data stored in the blob store. + * @return a set of all topology ids with special data stored in the blob store. */ public Set storedTopoIds() { return filterAndListKeys(TO_TOPO_ID); @@ -324,7 +324,7 @@ public void updateLastBlobUpdateTime() throws IOException { } /** - * Validates that the blob update time of the blobstore is up to date with the current existing blobs. + * Validates that the blob update time of the blobstore is up-to-date with the current existing blobs. * * @throws IOException on any error */ diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java index 33239d5b246..9ccf076ec1d 100644 --- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java +++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java @@ -530,7 +530,7 @@ public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) { * @param stormId The topology Id * @param timeoutMs How long until the backpressure znode is invalid. * @param callback The callback function - * @return True is backpressure/storm-id dir is not empty and at least one of the backpressure znodes has not timed out; false otherwise. + * @return True only when backpressure/storm-id dir is not empty and at least one of the backpressure znodes has not timed out. */ @Override public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback) { diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java index c442f3f4dc3..7a836b04f64 100644 --- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java +++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java @@ -349,10 +349,12 @@ public int getRateCounterUpdateIntervalSeconds() { return RATE_COUNTER_UPDATE_INTERVAL_SECONDS; } + @Override public MetricRegistry getRegistry() { return registry; } + @Override public Map getTaskMetrics() { return taskMetrics; } diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ReqContext.java b/storm-client/src/jvm/org/apache/storm/security/auth/ReqContext.java index 11a1cdf2d5f..5df94a47df1 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/ReqContext.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/ReqContext.java @@ -92,7 +92,7 @@ public String toString() { + ", reqId=" + reqId + ", remoteAddr=" + remoteAddr + ", authZPrincipal=" + ((principal() != null) ? principal().getName() : "null") - + ", ThreadId=" + Thread.currentThread().toString() + + ", ThreadId=" + Thread.currentThread() + '}'; } diff --git a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java index 3e7feef0c8a..56982411381 100644 --- a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java +++ b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java @@ -52,7 +52,7 @@ public class SerializationFactory { public static final ServiceLoader loader = ServiceLoader.load(SerializationRegister.class); public static Kryo getKryo(Map conf) { - IKryoFactory kryoFactory = (IKryoFactory) ReflectionUtils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY)); + IKryoFactory kryoFactory = ReflectionUtils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY)); Kryo k = kryoFactory.getKryo(conf); k.register(byte[].class); @@ -108,9 +108,7 @@ public static Kryo getKryo(Map conf) { } else { throw new RuntimeException(e); } - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { + } catch (InstantiationException | IllegalAccessException e) { throw new RuntimeException(e); } } @@ -231,7 +229,7 @@ public IdDictionary(StormTopology topology) { *

Note: Only one key wins if there are duplicate values. Which key wins is indeterminate: "{:a 1 :b 1} -> {1 :a} *or* {1 :b}" */ private static Map simpleReverseMap(Map map) { - Map ret = new HashMap(); + Map ret = new HashMap<>(); for (Map.Entry entry : map.entrySet()) { ret.put(entry.getValue(), entry.getKey()); } diff --git a/storm-client/src/jvm/org/apache/storm/streams/PairStream.java b/storm-client/src/jvm/org/apache/storm/streams/PairStream.java index 2d8e578536d..cc0ba5990e9 100644 --- a/storm-client/src/jvm/org/apache/storm/streams/PairStream.java +++ b/storm-client/src/jvm/org/apache/storm/streams/PairStream.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -168,6 +168,7 @@ public PairStream peek(Consumer> action) { /** * {@inheritDoc} */ + @Override public PairStream filter(Predicate> predicate) { return toPairStream(super.filter(predicate)); } @@ -534,7 +535,7 @@ public ArrayList apply(ArrayList aggregate, V value) { @Override public ArrayList merge(ArrayList accum1, ArrayList accum2) { - ArrayList res = new ArrayList(); + ArrayList res = new ArrayList<>(); res.addAll(accum1); res.addAll(accum2); return res; diff --git a/storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java b/storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java index 830b2edb17e..218c6fc80d0 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java +++ b/storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -88,6 +88,7 @@ public T setCPULoad(Number amount) { return (T) this; } + @Override @SuppressWarnings("unchecked") public T addResource(String resourceName, Number resourceValue) { Map resourcesMap = (Map) getComponentConfiguration() diff --git a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java index c14d6fb2112..82de58c4941 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java +++ b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -94,8 +94,8 @@ public class TopologyBuilder { private final Map sharedMemory = new HashMap<>(); private boolean hasStatefulBolt = false; - private Map stateSpouts = new HashMap<>(); - private List workerHooks = new ArrayList<>(); + private final Map stateSpouts = new HashMap<>(); + private final List workerHooks = new ArrayList<>(); private static String mergeIntoJson(Map into, Map newMap) { Map res = new HashMap<>(into); @@ -251,7 +251,7 @@ public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumen * outputs. * @param bolt the windowed bolt * @param parallelismHint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process - * somwehere around the cluster. + * somewhere around the cluster. * @return use the returned object to declare the inputs to this component * * @throws IllegalArgumentException if {@code parallelism_hint} is not positive @@ -292,7 +292,7 @@ public BoltDeclarer setBolt(String id, IStatefulBolt bolt) * outputs. * @param bolt the stateful bolt * @param parallelismHint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process - * somwehere around the cluster. + * somewhere around the cluster. * @return use the returned object to declare the inputs to this component * * @throws IllegalArgumentException if {@code parallelism_hint} is not positive @@ -300,7 +300,7 @@ public BoltDeclarer setBolt(String id, IStatefulBolt bolt) public BoltDeclarer setBolt(String id, IStatefulBolt bolt, Number parallelismHint) throws IllegalArgumentException { hasStatefulBolt = true; - return setBolt(id, new StatefulBoltExecutor(bolt), parallelismHint); + return setBolt(id, new StatefulBoltExecutor<>(bolt), parallelismHint); } /** @@ -330,7 +330,7 @@ public BoltDeclarer setBolt(String id, IStatefulWindowedBolt the type of the state (e.g. {@link org.apache.storm.state.KeyValueState}) * @return use the returned object to declare the inputs to this component * @@ -343,9 +343,9 @@ public BoltDeclarer setBolt(String id, IStatefulWindowedBolt(bolt); } else { - executor = new StatefulWindowedBoltExecutor(bolt); + executor = new StatefulWindowedBoltExecutor<>(bolt); } - return setBolt(id, new StatefulBoltExecutor(executor), parallelismHint); + return setBolt(id, new StatefulBoltExecutor<>(executor), parallelismHint); } /** @@ -553,7 +553,7 @@ private ComponentCommon getComponentCommon(String id, IComponent component) { private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException { ComponentCommon common = new ComponentCommon(); - common.set_inputs(new HashMap()); + common.set_inputs(new HashMap<>()); if (parallelism != null) { int dop = parallelism.intValue(); if (dop < 1) { @@ -646,69 +646,84 @@ public SpoutGetter(String id) { } protected class BoltGetter extends ConfigGetter implements BoltDeclarer { - private String boltId; + private final String boltId; public BoltGetter(String boltId) { super(boltId); this.boltId = boltId; } + @Override public BoltDeclarer fieldsGrouping(String componentId, Fields fields) { return fieldsGrouping(componentId, Utils.DEFAULT_STREAM_ID, fields); } + @Override public BoltDeclarer fieldsGrouping(String componentId, String streamId, Fields fields) { return grouping(componentId, streamId, Grouping.fields(fields.toList())); } + @Override public BoltDeclarer globalGrouping(String componentId) { return globalGrouping(componentId, Utils.DEFAULT_STREAM_ID); } + @Override public BoltDeclarer globalGrouping(String componentId, String streamId) { - return grouping(componentId, streamId, Grouping.fields(new ArrayList())); + return grouping(componentId, streamId, Grouping.fields(new ArrayList<>())); } + @Override public BoltDeclarer shuffleGrouping(String componentId) { return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID); } + @Override public BoltDeclarer shuffleGrouping(String componentId, String streamId) { return grouping(componentId, streamId, Grouping.shuffle(new NullStruct())); } + @Override public BoltDeclarer localOrShuffleGrouping(String componentId) { return localOrShuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID); } + @Override public BoltDeclarer localOrShuffleGrouping(String componentId, String streamId) { return grouping(componentId, streamId, Grouping.local_or_shuffle(new NullStruct())); } + @Override public BoltDeclarer noneGrouping(String componentId) { return noneGrouping(componentId, Utils.DEFAULT_STREAM_ID); } + @Override public BoltDeclarer noneGrouping(String componentId, String streamId) { return grouping(componentId, streamId, Grouping.none(new NullStruct())); } + @Override public BoltDeclarer allGrouping(String componentId) { return allGrouping(componentId, Utils.DEFAULT_STREAM_ID); } + @Override public BoltDeclarer allGrouping(String componentId, String streamId) { return grouping(componentId, streamId, Grouping.all(new NullStruct())); } + @Override public BoltDeclarer directGrouping(String componentId) { return directGrouping(componentId, Utils.DEFAULT_STREAM_ID); } + @Override public BoltDeclarer directGrouping(String componentId, String streamId) { return grouping(componentId, streamId, Grouping.direct(new NullStruct())); } + /* does not implement or override a super or interface */ private BoltDeclarer grouping(String componentId, String streamId, Grouping grouping) { commons.get(boltId).put_to_inputs(new GlobalStreamId(componentId, streamId), grouping); return this; diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/Aggregator.java b/storm-client/src/jvm/org/apache/storm/trident/operation/Aggregator.java index ecfacbf4062..71ecae61068 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/operation/Aggregator.java +++ b/storm-client/src/jvm/org/apache/storm/trident/operation/Aggregator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ReducerAggregatorImpl.java b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ReducerAggregatorImpl.java index bf208cb35b3..7f07f27144c 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ReducerAggregatorImpl.java +++ b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/ReducerAggregatorImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java index 68e346d570a..9cfd432669b 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -79,11 +79,7 @@ public static ConfigUtils setInstance(ConfigUtils u) { } public static Map maskPasswords(final Map conf) { - Maps.EntryTransformer maskPasswords = new Maps.EntryTransformer() { - public Object transformEntry(String key, Object value) { - return passwordConfigKeys.contains(key) ? "*****" : value; - } - }; + Maps.EntryTransformer maskPasswords = (key, value) -> passwordConfigKeys.contains(key) ? "*****" : value; return Maps.transformEntries(conf, maskPasswords); } @@ -109,7 +105,7 @@ public static boolean isLocalMode(Map conf) { */ public static Collection readDirContents(String dir) { Collection ret = readDirFiles(dir); - return ret.stream().map(car -> car.getName()).collect(Collectors.toList()); + return ret.stream().map(File::getName).collect(Collectors.toList()); } /** @@ -122,9 +118,7 @@ public static Collection readDirFiles(String dir) { Collection ret = new HashSet<>(); File[] files = new File(dir).listFiles(); if (files != null) { - for (File f : files) { - ret.add(f); - } + ret.addAll(Arrays.asList(files)); } return ret; } diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java index f330b427f58..6aa668f566d 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java +++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java @@ -254,6 +254,7 @@ private interface Inserter { } public interface Consumer extends MessagePassingQueue.Consumer { + @Override void accept(Object event); void flush() throws InterruptedException; diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java index a634402edda..55ad83a32cf 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java @@ -116,10 +116,10 @@ public class Utils { private static final Set> defaultAllowedExceptions = Collections.emptySet(); private static final List LOCALHOST_ADDRESSES = Lists.newArrayList("localhost", "127.0.0.1", "0:0:0:0:0:0:0:1"); static SerializationDelegate serializationDelegate; - private static ThreadLocal threadSer = new ThreadLocal(); - private static ThreadLocal threadDes = new ThreadLocal(); + private static final ThreadLocal threadSer = new ThreadLocal<>(); + private static final ThreadLocal threadDes = new ThreadLocal<>(); private static ClassLoader cl = null; - private static Map localConf; + private static final Map localConf; // A singleton instance allows us to mock delegated static methods in our // tests by subclassing. private static Utils _instance = new Utils(); @@ -159,7 +159,7 @@ public static void resetClassLoaderForJavaDeSerialize() { public static List findResources(String name) { try { Enumeration resources = Thread.currentThread().getContextClassLoader().getResources(name); - List ret = new ArrayList(); + List ret = new ArrayList<>(); while (resources.hasMoreElements()) { ret.add(resources.nextElement()); } @@ -176,8 +176,7 @@ public static Map findAndReadConfigFile(String name, boolean mus in = getConfigFileInputStream(name); if (null != in) { Yaml yaml = new Yaml(new SafeConstructor()); - @SuppressWarnings("unchecked") - Map ret = (Map) yaml.load(new InputStreamReader(in)); + Map ret = yaml.load(new InputStreamReader(in)); if (null != ret) { return new HashMap<>(ret); } else { @@ -218,7 +217,7 @@ private static InputStream getConfigFileInputStream(String configFilePath) "Could not find config file, name not specified"); } - HashSet resources = new HashSet(findResources(configFilePath)); + HashSet resources = new HashSet<>(findResources(configFilePath)); if (resources.isEmpty()) { File configFile = new File(configFilePath); if (configFile.exists()) { @@ -385,41 +384,37 @@ public static boolean isSystemId(String id) { public static SmartThread asyncLoop(final Callable afn, boolean isDaemon, final Thread.UncaughtExceptionHandler eh, int priority, final boolean isFactory, boolean startImmediately, String threadName) { - SmartThread thread = new SmartThread(new Runnable() { - public void run() { - try { - final Callable fn = isFactory ? (Callable) afn.call() : afn; - while (true) { - if (Thread.interrupted()) { - throw new InterruptedException(); - } - final Long s = fn.call(); - if (s == null) { // then stop running it - break; - } - if (s > 0) { - Time.sleep(s); - } + SmartThread thread = new SmartThread(() -> { + try { + final Callable fn = (Callable) (isFactory ? afn.call() : afn); + while (true) { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + final Long s = fn.call(); + if (s == null) { // then stop running it + break; } - } catch (Throwable t) { - if (Utils.exceptionCauseIsInstanceOf( - InterruptedException.class, t)) { - LOG.info("Async loop interrupted!"); - return; + if (s > 0) { + Time.sleep(s); } - LOG.error("Async loop died!", t); - throw new RuntimeException(t); } + } catch (Throwable t) { + if (Utils.exceptionCauseIsInstanceOf( + InterruptedException.class, t)) { + LOG.info("Async loop interrupted!"); + return; + } + LOG.error("Async loop died!", t); + throw new RuntimeException(t); } }); if (eh != null) { thread.setUncaughtExceptionHandler(eh); } else { - thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - public void uncaughtException(Thread t, Throwable e) { - LOG.error("Async loop died!", e); - Utils.exitProcess(1, "Async loop died!"); - } + thread.setUncaughtExceptionHandler((t, e) -> { + LOG.error("Async loop died!", e); + Utils.exitProcess(1, "Async loop died!"); }); } thread.setDaemon(isDaemon); @@ -540,7 +535,7 @@ public static T javaDeserialize(byte[] serialized, Class clazz) { try { ByteArrayInputStream bis = new ByteArrayInputStream(serialized); - ObjectInputStream ois = null; + ObjectInputStream ois; if (null == Utils.cl) { ois = new ObjectInputStream(bis); } else { @@ -550,10 +545,8 @@ public static T javaDeserialize(byte[] serialized, Class clazz) { Object ret = ois.readObject(); ois.close(); return (T) ret; - } catch (IOException ioe) { + } catch (IOException | ClassNotFoundException ioe) { throw new RuntimeException(ioe); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); } } @@ -592,8 +585,8 @@ public static Id parseZkId(String id, String configName) { /** * Get the ACL for nimbus/supervisor. The Super User ACL. This assumes that security is enabled. * - * @param conf the config to get the super User ACL from - * @return the super user ACL. + * @param conf the config to get the superuser ACL from + * @return the superuser ACL. */ @SuppressWarnings("checkstyle:AbbreviationAsWordInName") public static ACL getSuperUserAcl(Map conf) { @@ -671,7 +664,7 @@ public static void handleWorkerUncaughtException(Throwable t) { handleUncaughtException(t, defaultAllowedExceptions, true); } - // Hadoop UserGroupInformation can launch an autorenewal thread that can cause a NullPointerException + // Hadoop UserGroupInformation can launch an auto-renewal thread that can cause a NullPointerException // for workers. See STORM-3606 for an explanation. private static boolean isAllowedWorkerException(Throwable t) { if (t instanceof NullPointerException) { @@ -756,14 +749,14 @@ public static UptimeComputer makeUptimeComputer() { *

Example usage in java: * Map<Integer, String> tasks; Map<String, List<Integer>> componentTasks = Utils.reverse_map(tasks); * - *

The order of he resulting list values depends on the ordering properties of the Map passed in. The caller is + *

The order of the resulting list values depends on the ordering properties of the Map passed in. The caller is * responsible for passing an ordered map if they expect the result to be consistently ordered as well. * * @param map to reverse * @return a reversed map */ public static HashMap> reverseMap(Map map) { - HashMap> rtn = new HashMap>(); + HashMap> rtn = new HashMap<>(); if (map == null) { return rtn; } @@ -772,7 +765,7 @@ public static HashMap> reverseMap(Map map) { V val = entry.getValue(); List list = rtn.get(val); if (list == null) { - list = new ArrayList(); + list = new ArrayList<>(); rtn.put(entry.getValue(), list); } list.add(key); @@ -794,11 +787,7 @@ public static Map> reverseMap(List> listSeq) { for (List listEntry : listSeq) { Object key = listEntry.get(0); Object val = listEntry.get(1); - List list = rtn.get(val); - if (list == null) { - list = new ArrayList<>(); - rtn.put(val, list); - } + List list = rtn.computeIfAbsent(val, k -> new ArrayList<>()); list.add(key); } return rtn; @@ -862,18 +851,13 @@ public static byte[] toByteArray(ByteBuffer buffer) { } public static Runnable mkSuicideFn() { - return new Runnable() { - @Override - public void run() { - exitProcess(1, "Worker died"); - } - }; + return () -> exitProcess(1, "Worker died"); } public static void readAndLogStream(String prefix, InputStream in) { try { BufferedReader r = new BufferedReader(new InputStreamReader(in)); - String line = null; + String line; while ((line = r.readLine()) != null) { LOG.info("{}:{}", prefix, line); } @@ -915,10 +899,8 @@ public static ComponentCommon getComponentCommon(StormTopology topology, String } public static List tuple(Object... values) { - List ret = new ArrayList(); - for (Object v : values) { - ret.add(v); - } + List ret = new ArrayList<>(); + Collections.addAll(ret, values); return ret; } @@ -940,7 +922,7 @@ public static byte[] gunzip(byte[] data) { ByteArrayInputStream bis = new ByteArrayInputStream(data); GZIPInputStream in = new GZIPInputStream(bis); byte[] buffer = new byte[1024]; - int len = 0; + int len; while ((len = in.read(buffer)) >= 0) { bos.write(buffer, 0, len); } @@ -953,8 +935,8 @@ public static byte[] gunzip(byte[] data) { } public static List getRepeat(List list) { - List rtn = new ArrayList(); - Set idSet = new HashSet(); + List rtn = new ArrayList<>(); + Set idSet = new HashSet<>(); for (String id : list) { if (idSet.contains(id)) { @@ -987,7 +969,7 @@ public static Object getSetComponentObject(ComponentObject obj) { /** * A cheap way to deterministically convert a number to a positive value. When the input is positive, the original value is returned. * When the input number is negative, the returned positive value is the original value bit AND against Integer.MAX_VALUE(0x7fffffff) - * which is not its absolutely value. + * which is not its absolute value. * * @param number a given number * @return a positive number. @@ -1022,8 +1004,8 @@ public static Map fromCompressedJsonConf(byte[] serialized) { } /** - * Creates a new map with a string value in the map replaced with an equivalently-lengthed string of '#'. (If the object is not a - * string to string will be called on it and replaced) + * Creates a new map with a string value in the map replaced with an equally long string of '#'. (If the object is not a + * string toString() will be called on it and replaced) * * @param m The map that a value will be redacted from * @param key The key pointing to the value to be redacted @@ -1101,7 +1083,7 @@ public static Double parseJvmHeapMemByChildOpts(List options, Double def default: unit = 1; } - Double result = value * unit / 1024.0 / 1024.0; + double result = value * unit / 1024.0 / 1024.0; return (result < 1.0) ? 1.0 : result; } } @@ -1112,7 +1094,7 @@ public static Double parseJvmHeapMemByChildOpts(List options, Double def } public static ClientBlobStore getClientBlobStore(Map conf) { - ClientBlobStore store = (ClientBlobStore) ReflectionUtils.newInstance((String) conf.get(Config.CLIENT_BLOBSTORE)); + ClientBlobStore store = ReflectionUtils.newInstance((String) conf.get(Config.CLIENT_BLOBSTORE)); store.prepare(conf); return store; } @@ -1142,9 +1124,7 @@ private static Map normalizeConf(Map conf) { return new HashMap<>(); } Map ret = new HashMap<>(conf); - for (Map.Entry entry : ret.entrySet()) { - ret.put(entry.getKey(), normalizeConfValue(entry.getValue())); - } + ret.replaceAll((k, v) -> normalizeConfValue(v)); return ret; } @@ -1378,7 +1358,7 @@ public static TreeMap integerDivided(int sum, int numPieces) { int base = sum / numPieces; int numInc = sum % numPieces; int numBases = numPieces - numInc; - TreeMap ret = new TreeMap(); + TreeMap ret = new TreeMap<>(); ret.put(base, numBases); if (numInc != 0) { ret.put(base + 1, numInc); @@ -1406,8 +1386,8 @@ public static List> partitionFixed(int maxNumChunks, Collection c Map parts = integerDivided(coll.size(), maxNumChunks); // Keys sorted in descending order - List sortedKeys = new ArrayList(parts.keySet()); - Collections.sort(sortedKeys, Collections.reverseOrder()); + List sortedKeys = new ArrayList<>(parts.keySet()); + sortedKeys.sort(Collections.reverseOrder()); Iterator it = coll.iterator(); @@ -1664,17 +1644,16 @@ public static T getCompatibleVersion(NavigableMap versione } return defaultValue; } - LOG.warn("Could not find a higer compatible version for {} {}, using {} instead", what, desiredVersion, ret.getKey()); + LOG.warn("Could not find a higher compatible version for {} {}, using {} instead", what, desiredVersion, ret.getKey()); } return ret.getValue(); } - @SuppressWarnings("unchecked") private static Map readConfIgnoreNotFound(Yaml yaml, File f) throws IOException { Map ret = null; if (f.exists()) { try (FileReader fr = new FileReader(f)) { - ret = (Map) yaml.load(fr); + ret = yaml.load(fr); } } return ret; @@ -1850,7 +1829,7 @@ public boolean isSleeping() { } public static class UptimeComputer { - int startTime = 0; + final int startTime; public UptimeComputer() { startTime = Time.currentTimeSecs(); @@ -1862,10 +1841,10 @@ public int upTime() { } private static class JarConfigReader { - private Yaml yaml; + private final Yaml yaml; private Map defaultsConf; private Map stormConf; - private File file; + private final File file; JarConfigReader(Yaml yaml, Map defaultsConf, Map stormConf, File file) { this.yaml = yaml; @@ -1903,13 +1882,13 @@ private void readArchive(ZipFile zipFile) throws IOException { if (!entry.isDirectory()) { if (defaultsConf == null && entry.getName().equals("defaults.yaml")) { try (InputStreamReader isr = new InputStreamReader(zipFile.getInputStream(entry))) { - defaultsConf = (Map) yaml.load(isr); + defaultsConf = yaml.load(isr); } } if (stormConf == null && entry.getName().equals("storm.yaml")) { try (InputStreamReader isr = new InputStreamReader(zipFile.getInputStream(entry))) { - stormConf = (Map) yaml.load(isr); + stormConf = yaml.load(isr); } } } @@ -1918,7 +1897,7 @@ private void readArchive(ZipFile zipFile) throws IOException { } /** - * Create a map of forward edges for bolts in a topology. Note that spouts can be source but not a target in + * Create a map of forward edges for bolts in a topology. Note that spouts can be a source but not a target in * the edge. The mapping contains ids of spouts and bolts. * * @param topology StormTopology to examine. @@ -1928,11 +1907,10 @@ private static Map> getStormTopologyForwardGraph(StormTopolo Map> edgesOut = new HashMap<>(); if (topology.get_bolts() != null) { - topology.get_bolts().entrySet().forEach(entry -> { - if (!Utils.isSystemId(entry.getKey())) { - entry.getValue().get_common().get_inputs().forEach((k, v) -> { - edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey()); - }); + topology.get_bolts().forEach((key, value) -> { + if (!Utils.isSystemId(key)) { + value.get_common().get_inputs().forEach( + (k, v) -> edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(key)); } }); } diff --git a/storm-client/src/jvm/org/apache/storm/windowing/persistence/SimpleWindowPartitionCache.java b/storm-client/src/jvm/org/apache/storm/windowing/persistence/SimpleWindowPartitionCache.java index 9fbe0eb0a86..4fb6a660633 100644 --- a/storm-client/src/jvm/org/apache/storm/windowing/persistence/SimpleWindowPartitionCache.java +++ b/storm-client/src/jvm/org/apache/storm/windowing/persistence/SimpleWindowPartitionCache.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -180,16 +180,19 @@ public static class SimpleWindowPartitionCacheBuilder implements WindowPar private long maximumSize; private RemovalListener removalListener; + @Override public SimpleWindowPartitionCacheBuilder maximumSize(long size) { maximumSize = size; return this; } + @Override public SimpleWindowPartitionCacheBuilder removalListener(RemovalListener listener) { removalListener = listener; return this; } + @Override public SimpleWindowPartitionCache build(CacheLoader loader) { return new SimpleWindowPartitionCache<>(maximumSize, removalListener, loader); } diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java index 0e4acdc3a0a..9d2ffc15862 100644 --- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java +++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) * under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -60,7 +60,7 @@ public interface AdminCommand { void run(String [] args, Map conf, String command) throws Exception; /** - * Print a help message to out. typically this should be in the form of. + * Print a help message to stdout. Typically, this should be in the form of. * command arguments: * description of command * argument - description @@ -75,7 +75,7 @@ public void run(String[] args, Map conf, String command) throws IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf)); - Set blobStoreTopologyIds = nimbusBlobStore.filterAndListKeys(key -> ConfigUtils.getIdFromBlobKey(key)); + Set blobStoreTopologyIds = nimbusBlobStore.filterAndListKeys(ConfigUtils::getIdFromBlobKey); Set activeTopologyIds = new HashSet<>(stormClusterState.activeStorms()); Sets.SetView diffTopology = Sets.difference(activeTopologyIds, blobStoreTopologyIds); LOG.info("active-topology-ids [{}] blob-topology-ids [{}] diff-topology [{}]", @@ -119,7 +119,7 @@ public void printCliHelp(String command, PrintStream out) { /** * Print value in a human readable format. * @param value what to print. - * @return a human readable string + * @return a human-readable string */ public static String prettyPrint(TBase value) { StringBuilder builder = new StringBuilder(); diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java index cbbc2d35d1c..ca9732eef00 100644 --- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java +++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java @@ -1055,6 +1055,7 @@ public void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heatbeat) th } + @Override public void processWorkerMetrics(WorkerMetrics metrics) throws TException { getNimbus().processWorkerMetrics(metrics); } diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java index a8f519d6453..7852f027070 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. * See the NOTICE file distributed with this work for additional information regarding copyright ownership. * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); @@ -75,14 +75,13 @@ * 2. The USER sets the ACLs, and the blob access is validated against these ACLs. * 3. The SUPERVISOR interacts with nimbus through the NimbusBlobStore Client API to download the blobs. * The supervisors principal should match the set of users configured into SUPERVISOR_ADMINS. - * Here, the PrincipalToLocalPlugin takes care of mapping the principal to user name before the ACL validation. + * Here, the PrincipalToLocalPlugin takes care of mapping the principal to username before the ACL validation. */ public class LocalFsBlobStore extends BlobStore { public static final Logger LOG = LoggerFactory.getLogger(LocalFsBlobStore.class); private static final String DATA_PREFIX = "data_"; private static final String META_PREFIX = "meta_"; private static final String BLOBSTORE_SUBTREE = "/blobstore/"; - private final int allPermissions = READ | WRITE | ADMIN; protected BlobStoreAclHandler aclHandler; private NimbusInfo nimbusInfo; private FileBlobStoreImpl fbs; @@ -208,6 +207,7 @@ public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject KeyAlreadyExistsException { LOG.debug("Creating Blob for key {}", key); validateKey(key); + int allPermissions = READ | WRITE | ADMIN; aclHandler.normalizeSettableBlobMeta(key, meta, who, allPermissions); BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl()); aclHandler.hasPermissions(meta.get_acl(), allPermissions, who, key); @@ -222,9 +222,7 @@ public AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject outputStream = null; this.stormClusterState.setupBlob(key, this.nimbusInfo, getVersionForKey(key, this.nimbusInfo, zkClient)); return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX + key, true)); - } catch (IOException e) { - throw new RuntimeException(e); - } catch (KeyNotFoundException e) { + } catch (IOException | KeyNotFoundException e) { throw new RuntimeException(e); } finally { if (outputStream != null) { @@ -299,7 +297,7 @@ public ReadableBlobMeta getBlobMeta(String key, Subject who) throws Authorizatio } /** - * Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi. + * Sets leader elector (only used by LocalFsBlobStore to help sync blobs between Nimbi). */ @Override public void setLeaderElector(ILeaderElector leaderElector) { @@ -412,7 +410,7 @@ public void shutdown() { zkClient.close(); } if (timer != null) { - timer.cancel();; + timer.cancel(); } stormClusterState.disconnect(); } @@ -436,9 +434,10 @@ public int getBlobReplication(String key, Subject who) throws Exception { } @Override - public int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException { - throw new UnsupportedOperationException("For local file system blob store the update blobs function does not work. " - + "Please use HDFS blob store to make this feature available."); + public int updateBlobReplication(String key, int replication, Subject who) { + throw new UnsupportedOperationException( + "For local file system blob store the update blobs function does not work. " + + "Please use HDFS blob store to make this feature available."); } //This additional check and download is for nimbus high availability in case you have more than one nimbus diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java index 18fab2da177..3761bae3f28 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java @@ -1185,6 +1185,7 @@ static class DynamicState { this.slotMetrics = slotMetrics; } + @Override public String toString() { StringBuffer sb = new StringBuffer(); sb.append(state); diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/ExecSorterByConnectionCount.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/ExecSorterByConnectionCount.java index 043e853d25e..699af94a574 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/ExecSorterByConnectionCount.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/ExecSorterByConnectionCount.java @@ -50,6 +50,7 @@ public ExecSorterByConnectionCount(TopologyDetails topologyDetails) { * @param unassignedExecutors an unmodifiable set of executors that need to be scheduled. * @return a list of executors in sorted order for scheduling. */ + @Override public List sortExecutors(Set unassignedExecutors) { Map componentMap = topologyDetails.getUserTopolgyComponents(); // excludes system components LinkedHashSet orderedExecutorSet = new LinkedHashSet<>(); // in insert order diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/ExecSorterByProximity.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/ExecSorterByProximity.java index f17b94c9f4f..6aa3b605c4d 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/ExecSorterByProximity.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/ExecSorterByProximity.java @@ -54,6 +54,7 @@ public ExecSorterByProximity(TopologyDetails topologyDetails) { * @param unassignedExecutors an unmodifiable set of executors that need to be scheduled. * @return a list of executors in sorted order for scheduling. */ + @Override public List sortExecutors(Set unassignedExecutors) { Map componentMap = topologyDetails.getUserTopolgyComponents(); // excludes system components LinkedHashSet orderedExecutorSet = new LinkedHashSet<>(); // in insert order diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java index 7ff4b05f35e..7f892dd70d4 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java @@ -587,6 +587,7 @@ private Map getScheduledExecCntByRackId() { * * @return a sorted list of racks */ + @Override public List getSortedRacks() { final ObjectResourcesSummary clusterResourcesSummary = createClusterSummarizedResources(); diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java index 64f9f08d18f..cdf50ba09ac 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java @@ -185,7 +185,7 @@ protected Iterable sortObjectResources( *
  • * The tie between two nodes with same resource availability is broken by using the node with lower minimum * percentage used. This comparison was used in {@link #sortObjectResourcesDefault(ObjectResourcesSummary, ExistingScheduleFunc)} - * but here it is made subservient to modified resource availbility used in + * but here it is made subservient to modified resource availability used in * {@link #sortObjectResourcesGeneric(ObjectResourcesSummary, ExecutorDetails, ExistingScheduleFunc)}. * *
  • @@ -384,7 +384,7 @@ private Iterable sortObjectResourcesDefault( *

    2) the subordinate/subservient resource availability percentage of a node in descending * order We calculate the resource availability percentage by dividing the resource availability that have exhausted or little of one of * the resources mentioned above will be ranked after on the node by the resource availability of the entire rack By doing this - * calculation, nodes nodes that have more balanced resource availability. So we will be less likely to pick a node that have a lot of + * calculation, nodes that have more balanced resource availability. So we will be less likely to pick a node that have a lot of * one resource but a low amount of another. * * @param availHosts a collection of all the hosts we want to sort @@ -431,7 +431,7 @@ private Iterable sortHosts( *

    2) the subordinate/subservient resource availability percentage of a node in descending * order We calculate the resource availability percentage by dividing the resource availability that have exhausted or little of one of * the resources mentioned above will be ranked after on the node by the resource availability of the entire rack By doing this - * calculation, nodes nodes that have more balanced resource availability. So we will be less likely to pick a node that have a lot of + * calculation, nodes that have more balanced resource availability. So we will be less likely to pick a node that have a lot of * one resource but a low amount of another. * * @param availRasNodes a list of all the nodes we want to sort @@ -640,7 +640,7 @@ private ObjectResourcesSummary createClusterSummarizedResources() { clusterResourcesSummary.getAvailableResourcesOverall(), clusterResourcesSummary.getTotalResourcesOverall(), clusterResourcesSummary.getObjectResources().size(), - rackIdToHosts.values().stream().mapToInt(x -> x.size()).sum()); + rackIdToHosts.values().stream().mapToInt(Set::size).sum()); return clusterResourcesSummary; } @@ -674,6 +674,7 @@ public Map getScheduledExecCntByRackId() { * * @return an iterable of sorted racks */ + @Override public Iterable getSortedRacks() { final ObjectResourcesSummary clusterResourcesSummary = createClusterSummarizedResources(); @@ -702,7 +703,7 @@ public List hostnameToNodes(String hostname) { } /** - * interface for calculating the number of existing executors scheduled on a object (rack or node). + * interface for calculating the number of existing executors scheduled on an object (rack or node). */ public interface ExistingScheduleFunc { int getNumExistingSchedule(String objectId); diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java index fce3065b991..1f34ce40a32 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -72,7 +73,7 @@ public static Object[] data() { private static final Logger LOG = LoggerFactory.getLogger(TestConstraintSolverStrategy.class); private static final int MAX_TRAVERSAL_DEPTH = 2000; private static final int NORMAL_BOLT_PARALLEL = 11; - //Dropping the parallelism of the bolts to 3 instead of 11 so we can find a solution in a reasonable amount of work when backtracking. + //Dropping the parallelism of the bolts to 3 instead of 11, so we can find a solution in a reasonable amount of work when backtracking. private static final int BACKTRACK_BOLT_PARALLEL = 3; private static final int CO_LOCATION_CNT = 2; @@ -182,7 +183,7 @@ private void setConstraintConfig(List> constraints, Map(spreads.keySet())); } } @@ -296,9 +297,9 @@ public void testMissingConfig() { Map conf = new HashMap<>(); conf.put(Config.TOPOLOGY_RAS_CONSTRAINTS, jsonValue); new ConstraintSolverConfig("test-topoid-2", conf, new HashSet<>()); - new ConstraintSolverConfig("test-topoid-3", conf, new HashSet<>(Arrays.asList("comp-x"))); - new ConstraintSolverConfig("test-topoid-4", conf, new HashSet<>(Arrays.asList("comp-1"))); - new ConstraintSolverConfig("test-topoid-5", conf, new HashSet<>(Arrays.asList("comp-1, comp-x"))); + new ConstraintSolverConfig("test-topoid-3", conf, new HashSet<>(Collections.singletonList("comp-x"))); + new ConstraintSolverConfig("test-topoid-4", conf, new HashSet<>(Collections.singletonList("comp-1"))); + new ConstraintSolverConfig("test-topoid-5", conf, new HashSet<>(Collections.singletonList("comp-1, comp-x"))); } } @@ -321,16 +322,12 @@ public void testNewConstraintFormat() { Object jsonValue = JSONValue.parse(s); Map config = Utils.readDefaultConfig(); config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, jsonValue); - Set allComps = new HashSet<>(); - allComps.addAll(Arrays.asList("comp-1", "comp-2", "comp-3", "comp-4", "comp-5")); + Set allComps = new HashSet<>(Arrays.asList("comp-1", "comp-2", "comp-3", "comp-4", "comp-5")); ConstraintSolverConfig constraintSolverConfig = new ConstraintSolverConfig("test-topoid-1", config, allComps); - Set expectedSetComp1 = new HashSet<>(); - expectedSetComp1.addAll(Arrays.asList("comp-2", "comp-3")); - Set expectedSetComp2 = new HashSet<>(); - expectedSetComp2.addAll(Arrays.asList("comp-1", "comp-4")); - Set expectedSetComp3 = new HashSet<>(); - expectedSetComp3.addAll(Arrays.asList("comp-1", "comp-5")); + Set expectedSetComp1 = new HashSet<>(Arrays.asList("comp-2", "comp-3")); + Set expectedSetComp2 = new HashSet<>(Arrays.asList("comp-1", "comp-4")); + Set expectedSetComp3 = new HashSet<>(Arrays.asList("comp-1", "comp-5")); Assert.assertEquals("comp-1 incompatible components", expectedSetComp1, constraintSolverConfig.getIncompatibleComponentSets().get("comp-1")); Assert.assertEquals("comp-2 incompatible components", expectedSetComp2, constraintSolverConfig.getIncompatibleComponentSets().get("comp-2")); Assert.assertEquals("comp-3 incompatible components", expectedSetComp3, constraintSolverConfig.getIncompatibleComponentSets().get("comp-3")); @@ -352,6 +349,7 @@ public void testConstraintSolverForceBacktrackWithSpreadCoLocation() { } ConstraintSolverStrategy cs = new ConstraintSolverStrategy() { + @Override protected void prepareForScheduling(Cluster cluster, TopologyDetails topologyDetails) { super.prepareForScheduling(cluster, topologyDetails); @@ -510,7 +508,7 @@ public void testIntegrationWithRAS() { addConstraints("bolt-1", "bolt-1", constraints); addConstraints("bolt-1", "bolt-2", constraints); - Map spreads = new HashMap(); + Map spreads = new HashMap<>(); spreads.put("spout-0", 1); spreads.put("bolt-1", 10); @@ -569,7 +567,7 @@ public void testZeroExecutorScheduling() { Cluster cluster = makeCluster(new Topologies(topo)); cs.schedule(cluster, topo); LOG.info("********************* Scheduling Zero Unassigned Executors *********************"); - cs.schedule(cluster, topo); // reschedule a fully schedule topology + cs.schedule(cluster, topo); // reschedule a fully scheduled topology LOG.info("********************* End of Scheduling Zero Unassigned Executors *********************"); }