Skip to content

Commit

Permalink
KAFKA-17794: Add some formatting safeguards for KIP-853 (#17504)
Browse files Browse the repository at this point in the history
KIP-853 adds support for dynamic KRaft quorums. This means that the quorum topology is
no longer statically determined by the controller.quorum.voters configuration. Instead, it
is contained in the storage directories of each controller and broker.

Users of dynamic quorums must format at least one controller storage directory with either
the --initial-controllers or --standalone flags.  If they fail to do this, no quorum can be
established. This PR changes the storage tool to warn about the case where a KIP-853 flag has
not been supplied to format a KIP-853 controller. (Note that broker storage directories
can continue to be formatted without a KIP-853 flag.)

There are cases where we don't want to specify initial voters when formatting a controller. One
example is where we format a single controller with --standalone, and then dynamically add 4
more controllers with no initial topology. In this case, we want the 4 later controllers to grab
the quorum topology from the initial one. To support this case, this PR adds the
--no-initial-controllers flag.

Reviewers: José Armando García Sancio <[email protected]>, Federico Valeri <[email protected]>
  • Loading branch information
cmccabe committed Oct 21, 2024
1 parent 7842e25 commit c821449
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 26 deletions.
41 changes: 29 additions & 12 deletions core/src/main/scala/kafka/tools/StorageTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.tools

import kafka.server.KafkaConfig
Expand All @@ -31,7 +30,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.metadata.storage.{Formatter, FormatterException}
import org.apache.kafka.raft.DynamicVoters
import org.apache.kafka.raft.{DynamicVoters, QuorumConfig}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.ReplicationConfigs

Expand Down Expand Up @@ -126,9 +125,20 @@ object StorageTool extends Logging {
foreach(v => formatter.setReleaseVersion(MetadataVersion.fromVersionString(v.toString)))
}
Option(namespace.getString("initial_controllers")).
foreach(v => formatter.setInitialVoters(DynamicVoters.parse(v)))
foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v)))
if (namespace.getBoolean("standalone")) {
formatter.setInitialVoters(createStandaloneDynamicVoters(config))
formatter.setInitialControllers(createStandaloneDynamicVoters(config))
}
if (!namespace.getBoolean("no_initial_controllers")) {
if (config.processRoles.contains(ProcessRole.ControllerRole)) {
if (config.quorumVoters.isEmpty) {
if (!formatter.initialVoters().isPresent()) {
throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG +
" is not set on this controller, you must specify one of the following: " +
"--standalone, --initial-controllers, or --no-initial-controllers.");
}
}
}
}
Option(namespace.getList("add_scram")).
foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
Expand All @@ -140,7 +150,7 @@ object StorageTool extends Logging {
config: KafkaConfig
): DynamicVoters = {
if (!config.processRoles.contains(ProcessRole.ControllerRole)) {
throw new TerseFailure("You cannot use --standalone on a broker node.")
throw new TerseFailure("You can only use --standalone on a controller.")
}
if (config.effectiveAdvertisedControllerListeners.isEmpty) {
throw new RuntimeException("No controller listeners found.")
Expand Down Expand Up @@ -191,13 +201,20 @@ object StorageTool extends Logging {
help("The setting to use for a specific feature, in feature=level format. For example: `kraft.version=1`.").
action(append())
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
reconfigurableQuorumOptions.addArgument("--standalone", "-s").
help("Used to initialize a single-node quorum controller quorum.").
action(storeTrue())
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I").
help("The initial controllers, as a comma-separated list of id@hostname:port:directory. The same values must be used to format all nodes. For example:\n" +
"[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n").
action(store())
reconfigurableQuorumOptions.addArgument("--standalone", "-s")
.help("Used to initialize a controller as a single-node dynamic quorum.")
.action(storeTrue())

reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
.help("Used to initialize a server without a dynamic quorum topology.")
.action(storeTrue())

reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
.help("Used to initialize a server with a specific dynamic quorum topology. The argument " +
"is a comma-separated list of id@hostname:port:directory. The same values must be used to " +
"format all nodes. For example:\n[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:" +
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n")
.action(store())
parser.parseArgs(args)
}

Expand Down
61 changes: 56 additions & 5 deletions core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,9 @@ Found problem:
defaultDynamicQuorumProperties.setProperty("process.roles", "controller")
defaultDynamicQuorumProperties.setProperty("node.id", "0")
defaultDynamicQuorumProperties.setProperty("controller.listener.names", "CONTROLLER")
defaultDynamicQuorumProperties.setProperty("controller.quorum.voters", "0@localhost:9093")
defaultDynamicQuorumProperties.setProperty("listeners", "CONTROLLER://127.0.0.1:9093")
defaultDynamicQuorumProperties.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
defaultDynamicQuorumProperties.setProperty("listeners", "CONTROLLER://:9093")
defaultDynamicQuorumProperties.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093")
defaultDynamicQuorumProperties.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")
defaultDynamicQuorumProperties.setProperty(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG , "true")

Expand Down Expand Up @@ -378,7 +379,7 @@ Found problem:
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone")
assertEquals("You cannot use --standalone on a broker node.",
assertEquals("You can only use --standalone on a controller.",
assertThrows(classOf[TerseFailure],
() => runFormatCommand(stream, properties, arguments.toSeq)).getMessage)
}
Expand Down Expand Up @@ -437,11 +438,61 @@ Found problem:
"Failed to find content in output: " + stream.toString())
}

@ParameterizedTest
@ValueSource(strings = Array("controller", "broker,controller"))
def testFormatWithoutStaticQuorumFailsWithoutInitialControllersOnController(processRoles: String): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultDynamicQuorumProperties)
if (processRoles.contains("broker")) {
properties.setProperty("listeners", "PLAINTEXT://:9092,CONTROLLER://:9093")
properties.setProperty("advertised.listeners", "PLAINTEXT://127.0.0.1:9092,CONTROLLER://127.0.0.1:9093")
}
properties.setProperty("process.roles", processRoles)
properties.setProperty("log.dirs", availableDirs.mkString(","))
assertEquals("Because controller.quorum.voters is not set on this controller, you must " +
"specify one of the following: --standalone, --initial-controllers, or " +
"--no-initial-controllers.",
assertThrows(classOf[TerseFailure],
() => runFormatCommand(new ByteArrayOutputStream(), properties,
Seq("--release-version", "3.9-IV0"))).getMessage)
}

@Test
def testBootstrapScramRecords(): Unit = {
def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultDynamicQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
assertEquals(0, runFormatCommand(stream, properties,
Seq("--no-initial-controllers", "--release-version", "3.9-IV0")))
assertTrue(stream.toString().
contains("Formatting metadata directory %s".format(availableDirs.head)),
"Failed to find content in output: " + stream.toString())
}

@Test
def testFormatWithoutStaticQuorumSucceedsWithoutInitialControllersOnBroker(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultDynamicQuorumProperties)
properties.setProperty("listeners", "PLAINTEXT://:9092")
properties.setProperty("advertised.listeners", "PLAINTEXT://127.0.0.1:9092")
properties.setProperty("process.roles", "broker")
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
assertEquals(0, runFormatCommand(stream, properties, Seq("--release-version", "3.9-IV0")))
assertTrue(stream.toString().
contains("Formatting metadata directory %s".format(availableDirs.head)),
"Failed to find content in output: " + stream.toString())
}

@Test
def testBootstrapScramRecords(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
val arguments = ListBuffer[String](
Expand All @@ -468,7 +519,7 @@ Found problem:
def testScramRecordsOldReleaseVersion(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultDynamicQuorumProperties)
properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
val arguments = ListBuffer[String](
Expand Down
4 changes: 2 additions & 2 deletions docs/ops.html
Original file line number Diff line number Diff line change
Expand Up @@ -3818,9 +3818,9 @@ <h5 class="anchor-heading"><a id="kraft_storage_voters" class="anchor-link"></a>
In the replica description 0@controller-0:1234:3Db5QLSqSZieL3rJBUUegA, 0 is the replica id, 3Db5QLSqSZieL3rJBUUegA is the replica directory id, controller-0 is the replica's host and 1234 is the replica's port.

<h5 class="anchor-heading"><a id="kraft_storage_observers" class="anchor-link"></a><a href="#kraft_storage_observers">Formatting Brokers and New Controllers</a></h5>
When provisioning new broker and controller nodes that we want to add to an existing Kafka cluster, use the <code>kafka-storage.sh format</code> command without the --standalone or --initial-controllers flags.
When provisioning new broker and controller nodes that we want to add to an existing Kafka cluster, use the <code>kafka-storage.sh format</code> command with the --no-initial-controllers flag.

<pre><code class="language-bash">$ bin/kafka-storage format --cluster-id &lt;cluster-id&gt; --config server.properties</code></pre>
<pre><code class="language-bash">$ bin/kafka-storage.sh format --cluster-id &lt;cluster-id&gt; --config server.properties --no-initial-controllers</code></pre>

<h4 class="anchor-heading"><a id="kraft_reconfig" class="anchor-link"></a><a href="#kraft_reconfig">Controller membership changes</a></h4>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,15 @@ public Formatter setMetadataLogDirectory(String metadataLogDirectory) {
return this;
}

public Formatter setInitialVoters(DynamicVoters initialControllers) {
public Formatter setInitialControllers(DynamicVoters initialControllers) {
this.initialControllers = Optional.of(initialControllers);
return this;
}

public Optional<DynamicVoters> initialVoters() {
return initialControllers;
}

boolean hasDynamicQuorum() {
if (initialControllers.isPresent()) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ public void testFormatWithInitialVoters(boolean specifyKRaftVersion) throws Exce
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
}
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
formatter1.formatter.setInitialVoters(DynamicVoters.
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
formatter1.formatter.run();
assertEquals(Arrays.asList(
Expand Down Expand Up @@ -403,7 +403,7 @@ public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws Excep
FormatterContext formatter1 = testEnv.newFormatter();
formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
formatter1.formatter.setInitialVoters(DynamicVoters.
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
assertTrue(formatter1.formatter.hasDynamicQuorum());
assertEquals("Cannot set kraft.version to 0 if KIP-853 configuration is present. " +
Expand Down Expand Up @@ -433,7 +433,7 @@ public void testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Ex
FormatterContext formatter1 = testEnv.newFormatter();
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
formatter1.formatter.setInitialVoters(DynamicVoters.
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
assertEquals("kraft.version could not be set to 1 because it depends on " +
Expand Down
9 changes: 6 additions & 3 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,9 +914,12 @@ def start_node(self, node, timeout_sec=60, **kwargs):
cmd = "%s format --ignore-formatted --config %s --cluster-id %s" % (kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
if self.dynamicRaftQuorum:
cmd += " --feature kraft.version=1"
if not self.standalone_controller_bootstrapped and self.node_quorum_info.has_controller_role:
cmd += " --standalone"
self.standalone_controller_bootstrapped = True
if self.node_quorum_info.has_controller_role:
if self.standalone_controller_bootstrapped:
cmd += " --no-initial-controllers"
else:
cmd += " --standalone"
self.standalone_controller_bootstrapped = True
self.logger.info("Running log directory format command...\n%s" % cmd)
node.account.ssh(cmd)

Expand Down

0 comments on commit c821449

Please sign in to comment.