From 02d3f40056e37d395dbd182d3d819d0be67d8d55 Mon Sep 17 00:00:00 2001
From: Jorge Esteban Quilcate Otoya
Date: Wed, 21 Jan 2026 16:53:01 +0200
Subject: [PATCH 1/5] test(metadata:diskless): improve test coverage for broker
fencing and unregister scenarios
- Add _noRacks and _withRacks test variants for consistent coverage
- Fix tests that assumed broker 0 was always the leader
- Get actual leader from partition registration before fencing/unregistering
- Use dynamic assertions based on actual partition state
- Improve assertion error messages for clarity
---
.../ReplicationControlManagerTest.java | 136 +++++++++++++++---
1 file changed, 119 insertions(+), 17 deletions(-)
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 72b58a0a53d..aa678568d49 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -4234,9 +4234,9 @@ public void testReassignDisklessPartitions() {
}
@Test
- public void testNoLeaderElectionOnBrokerFenced() {
+ public void testNoLeaderElectionOnBrokerFenced_noRacks() {
// As there are no replicas to elect from, the leader should go offline but no new leader should be elected.
- // Currently, diskless topics ignored replica management. It registers a single replica as the leader, but it's not maintained.
+ // Unmanaged diskless topics register a single replica as the leader, but it's not maintained.
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setDisklessStorageSystemEnabled(true)
.build();
@@ -4253,25 +4253,29 @@ public void testNoLeaderElectionOnBrokerFenced() {
);
final Uuid topicId = createResult.topicId();
+ // Get the actual leader before fencing
+ PartitionRegistration partitionBefore = replication.getPartition(topicId, 0);
+ int leader = partitionBefore.leader;
+
List records = new ArrayList<>();
- replication.handleBrokerFenced(0, records);
+ replication.handleBrokerFenced(leader, records);
ctx.replay(records);
PartitionRegistration partition = replication.getPartition(topicId, 0);
assertNotNull(partition, "Partition should exist after leader fencing");
- assertArrayEquals(new int[]{0}, partition.isr, "ISR should remain unchanged as there was only one leader");
+ assertArrayEquals(new int[]{leader}, partition.isr, "ISR should remain unchanged as there is only one replica");
assertEquals(-1, partition.leader, "Leader should be offline after fencing");
}
@Test
- public void testNoReplicaChangeOnShutdown() {
+ public void testNoLeaderElectionOnBrokerFenced_withRacks() {
// As there are no replicas to elect from, the leader should go offline but no new leader should be elected.
- // Currently, diskless topics ignored replica management. It registers a single replica as the leader, but it's not maintained.
+ // Unmanaged diskless topics register a single replica as the leader, but it's not maintained.
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setDisklessStorageSystemEnabled(true)
.build();
ReplicationControlManager replication = ctx.replicationControl;
- ctx.registerBrokers(0, 1, 2);
+ ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c");
ctx.unfenceBrokers(0, 1, 2);
CreatableTopicResult createResult = ctx.createTestTopic(
@@ -4283,16 +4287,88 @@ public void testNoReplicaChangeOnShutdown() {
);
final Uuid topicId = createResult.topicId();
+ // Get the actual leader before fencing
+ PartitionRegistration partitionBefore = replication.getPartition(topicId, 0);
+ int leader = partitionBefore.leader;
+
List records = new ArrayList<>();
- replication.handleBrokerShutdown(0, true, records);
+ replication.handleBrokerFenced(leader, records);
ctx.replay(records);
PartitionRegistration partition = replication.getPartition(topicId, 0);
assertNotNull(partition, "Partition should exist after leader fencing");
- assertArrayEquals(new int[]{0}, partition.isr, "ISR should remain unchanged as there was only one leader");
+ assertArrayEquals(new int[]{leader}, partition.isr, "ISR should remain unchanged as there is only one replica");
assertEquals(-1, partition.leader, "Leader should be offline after fencing");
}
+ @Test
+ public void testNoReplicaChangeOnShutdown_noRacks() {
+ // As there are no replicas to elect from, the leader should go offline but no new leader should be elected.
+ // Unmanaged diskless topics register a single replica as the leader, but it's not maintained.
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDisklessStorageSystemEnabled(true)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+
+ CreatableTopicResult createResult = ctx.createTestTopic(
+ "foo",
+ 1,
+ (short) 1,
+ Map.of(DISKLESS_ENABLE_CONFIG, "true"),
+ NONE.code()
+ );
+ final Uuid topicId = createResult.topicId();
+
+ // Get the actual leader before shutdown
+ PartitionRegistration partitionBefore = replication.getPartition(topicId, 0);
+ int leader = partitionBefore.leader;
+
+ List records = new ArrayList<>();
+ replication.handleBrokerShutdown(leader, true, records);
+ ctx.replay(records);
+
+ PartitionRegistration partition = replication.getPartition(topicId, 0);
+ assertNotNull(partition, "Partition should exist after leader shutdown");
+ assertArrayEquals(new int[]{leader}, partition.isr, "ISR should remain unchanged as there is only one replica");
+ assertEquals(-1, partition.leader, "Leader should be offline after shutdown");
+ }
+
+ @Test
+ public void testNoReplicaChangeOnShutdown_withRacks() {
+ // As there are no replicas to elect from, the leader should go offline but no new leader should be elected.
+ // Unmanaged diskless topics register a single replica as the leader, but it's not maintained.
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDisklessStorageSystemEnabled(true)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c");
+ ctx.unfenceBrokers(0, 1, 2);
+
+ CreatableTopicResult createResult = ctx.createTestTopic(
+ "foo",
+ 1,
+ (short) 1,
+ Map.of(DISKLESS_ENABLE_CONFIG, "true"),
+ NONE.code()
+ );
+ final Uuid topicId = createResult.topicId();
+
+ // Get the actual leader before shutdown
+ PartitionRegistration partitionBefore = replication.getPartition(topicId, 0);
+ int leader = partitionBefore.leader;
+
+ List records = new ArrayList<>();
+ replication.handleBrokerShutdown(leader, true, records);
+ ctx.replay(records);
+
+ PartitionRegistration partition = replication.getPartition(topicId, 0);
+ assertNotNull(partition, "Partition should exist after leader shutdown");
+ assertArrayEquals(new int[]{leader}, partition.isr, "ISR should remain unchanged as there is only one replica");
+ assertEquals(-1, partition.leader, "Leader should be offline after shutdown");
+ }
+
@Test
void testDisklessMarksLeaderOfflineOnUnregister_noRacks() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
@@ -4312,18 +4388,31 @@ void testDisklessMarksLeaderOfflineOnUnregister_noRacks() {
);
final Uuid topicId = createResult.topicId();
+ // Identify partitions that have broker 0 as leader before unregistering
+ Set partitionsWithBroker0AsLeader = new HashSet<>();
+ for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+ PartitionRegistration partition = replication.getPartition(topicId, partitionId);
+ if (partition.leader == 0) {
+ partitionsWithBroker0AsLeader.add(partitionId);
+ }
+ }
+
List records = new ArrayList<>();
replication.handleBrokerUnregistered(0, 100, records);
ctx.replay(records);
// All partitions should remain present and keep the original replica/ISR,
- // only the leader should be marked offline.
+ // only leaders on broker 0 should be marked offline.
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
PartitionRegistration partition = replication.getPartition(topicId, partitionId);
assertNotNull(partition, "Partition " + partitionId + " should exist after broker unregistration");
- assertArrayEquals(new int[]{0}, partition.replicas, "Replicas should stay unchanged for partition " + partitionId);
- assertArrayEquals(new int[]{0}, partition.isr, "ISR should stay unchanged for partition " + partitionId);
- assertEquals(-1, partition.leader, "Leader should be offline for partition " + partitionId);
+ assertEquals(1, partition.replicas.length, "Replicas should have 1 element for partition " + partitionId);
+ assertEquals(1, partition.isr.length, "ISR should have 1 element for partition " + partitionId);
+ if (partitionsWithBroker0AsLeader.contains(partitionId)) {
+ assertEquals(-1, partition.leader, "Leader should be offline for partition " + partitionId + " (was on broker 0)");
+ } else {
+ assertTrue(partition.leader >= 0, "Leader should remain online for partition " + partitionId + " (was not on broker 0)");
+ }
}
}
@@ -4346,18 +4435,31 @@ void testDisklessMarksLeaderOfflineOnUnregister_withRacks() {
);
final Uuid topicId = createResult.topicId();
+ // Identify partitions that have broker 0 as leader before unregistering
+ Set partitionsWithBroker0AsLeader = new HashSet<>();
+ for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+ PartitionRegistration partition = replication.getPartition(topicId, partitionId);
+ if (partition.leader == 0) {
+ partitionsWithBroker0AsLeader.add(partitionId);
+ }
+ }
+
List records = new ArrayList<>();
replication.handleBrokerUnregistered(0, 100, records);
ctx.replay(records);
// All partitions should remain present and keep the original replica/ISR,
- // only the leader should be marked offline.
+ // only leaders on broker 0 should be marked offline.
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
PartitionRegistration partition = replication.getPartition(topicId, partitionId);
assertNotNull(partition, "Partition " + partitionId + " should exist after broker unregistration");
- assertArrayEquals(new int[]{0}, partition.replicas, "Replicas should stay unchanged for partition " + partitionId);
- assertArrayEquals(new int[]{0}, partition.isr, "ISR should stay unchanged for partition " + partitionId);
- assertEquals(-1, partition.leader, "Leader should be offline for partition " + partitionId);
+ assertEquals(1, partition.replicas.length, "Replicas should have 1 element for partition " + partitionId);
+ assertEquals(1, partition.isr.length, "ISR should have 1 element for partition " + partitionId);
+ if (partitionsWithBroker0AsLeader.contains(partitionId)) {
+ assertEquals(-1, partition.leader, "Leader should be offline for partition " + partitionId + " (was on broker 0)");
+ } else {
+ assertTrue(partition.leader >= 0, "Leader should remain online for partition " + partitionId + " (was not on broker 0)");
+ }
}
}
From 15fe52678acd7168fdcd4d9539844f3b03e33150 Mon Sep 17 00:00:00 2001
From: Jorge Esteban Quilcate Otoya
Date: Tue, 20 Jan 2026 13:29:25 +0200
Subject: [PATCH 2/5] feat(controller:diskless): add server config for managed
replicas
Add diskless.managed.rf.enable config (default: false) to control whether
diskless topics use managed replicas with RF=rack_count or legacy RF=1.
This config only affects topic creation. When enabled, new diskless topics
will be created with one replica per rack using standard KRaft placement.
Part of Phase 1: Diskless Managed Replicas
(See #478 docs/inkless/ts-unification/DISKLESS_MANAGED_RF.md)
# Conflicts:
# core/src/main/scala/kafka/server/ControllerServer.scala
# core/src/main/scala/kafka/server/KafkaConfig.scala
# metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
# server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java
---
.../main/scala/kafka/server/ControllerServer.scala | 1 +
core/src/main/scala/kafka/server/KafkaConfig.scala | 1 +
.../apache/kafka/controller/QuorumController.java | 8 ++++++++
.../apache/kafka/server/config/ServerConfigs.java | 14 ++++++++++++++
4 files changed, 24 insertions(+)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index b0755bd361a..db894c98ec6 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -240,6 +240,7 @@ class ControllerServer(
setDefaultNumPartitions(config.numPartitions.intValue()).
setDefaultDisklessEnable(config.logDisklessEnable).
setDisklessStorageSystemEnabled(config.disklessStorageSystemEnabled).
+ setDisklessManagedReplicasEnabled(config.disklessManagedReplicasEnabled).
setClassicRemoteStorageForceEnabled(config.classicRemoteStorageForceEnabled).
setClassicRemoteStorageForceExcludeTopicRegexes(config.classicRemoteStorageForceExcludeTopicRegexes).
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 29b3c2c1c98..1ce736bb363 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -425,6 +425,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
/** Diskless Configuration */
val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG)
val disklessAllowFromClassicEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG)
+ val disklessManagedReplicasEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG)
val classicRemoteStorageForceEnabled: Boolean = getBoolean(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG)
val classicRemoteStorageForceExcludeTopicRegexes: java.util.List[String] =
getList(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_EXCLUDE_TOPIC_REGEXES_CONFIG)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index f8110170a0e..46f26092087 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -223,6 +223,7 @@ public static class Builder {
private boolean defaultDisklessEnable = false;
private boolean disklessStorageSystemEnabled = false;
+ private boolean disklessManagedReplicasEnabled = false;
private boolean classicRemoteStorageForceEnabled = false;
private List classicRemoteStorageForceExcludeTopicRegexes = List.of();
@@ -295,6 +296,11 @@ public Builder setDisklessStorageSystemEnabled(boolean disklessStorageSystemEnab
return this;
}
+ public Builder setDisklessManagedReplicasEnabled(boolean disklessManagedReplicasEnabled) {
+ this.disklessManagedReplicasEnabled = disklessManagedReplicasEnabled;
+ return this;
+ }
+
public Builder setClassicRemoteStorageForceEnabled(boolean classicRemoteStorageForceEnabled) {
this.classicRemoteStorageForceEnabled = classicRemoteStorageForceEnabled;
return this;
@@ -454,6 +460,7 @@ public QuorumController build() throws Exception {
defaultNumPartitions,
defaultDisklessEnable,
disklessStorageSystemEnabled,
+ disklessManagedReplicasEnabled,
classicRemoteStorageForceEnabled,
classicRemoteStorageForceExcludeTopicRegexes,
replicaPlacer,
@@ -1497,6 +1504,7 @@ private QuorumController(
int defaultNumPartitions,
boolean defaultDisklessEnable,
boolean disklessStorageSystemEnabled,
+ boolean disklessManagedReplicasEnabled,
boolean classicRemoteStorageForceEnabled,
List classicRemoteStorageForceExcludeTopicRegexes,
ReplicaPlacer replicaPlacer,
diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java
index a875271f979..6970984e5c6 100644
--- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java
+++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java
@@ -142,6 +142,19 @@ public class ServerConfigs {
"This should only be enabled in non-production environments for testing or migration purposes. " +
"When enabled, topics can have their diskless.enable config changed from false to true.";
+ // When enabled, diskless topics are created with RF = rack_count (one replica per rack).
+ // If brokers are registered but none have a rack configured, RF defaults to 1.
+ // If no brokers are registered, topic creation fails with BROKER_NOT_AVAILABLE.
+ // When disabled (default), diskless topics use legacy RF=1 behavior.
+ // This config only affects topic creation.
+ public static final String DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG = "diskless.managed.rf.enable";
+ public static final boolean DISKLESS_MANAGED_REPLICAS_ENABLE_DEFAULT = false;
+ public static final String DISKLESS_MANAGED_REPLICAS_ENABLE_DOC = "When enabled, new diskless topics are created " +
+ "with replication factor equal to the number of racks (one replica per rack). " +
+ "If brokers are registered but none have a rack configured, RF defaults to 1. " +
+ "When disabled, diskless topics use legacy RF=1 behavior. " +
+ "This config only affects topic creation.";
+
public static final String CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG = "classic.remote.storage.force.enable";
public static final boolean CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_DEFAULT = false;
public static final String CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_DOC = "Force classic topics to be created with remote.storage.enable=true, " +
@@ -202,6 +215,7 @@ public class ServerConfigs {
DISKLESS_STORAGE_SYSTEM_ENABLE_DOC)
.define(DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, BOOLEAN, DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DEFAULT, LOW,
DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DOC)
+ .define(DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG, BOOLEAN, DISKLESS_MANAGED_REPLICAS_ENABLE_DEFAULT, MEDIUM, DISKLESS_MANAGED_REPLICAS_ENABLE_DOC)
.define(CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG, BOOLEAN, CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_DEFAULT, LOW,
CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_DOC)
.define(CLASSIC_REMOTE_STORAGE_FORCE_EXCLUDE_TOPIC_REGEXES_CONFIG, LIST, CLASSIC_REMOTE_STORAGE_FORCE_EXCLUDE_TOPIC_REGEXES_DEFAULT,
From 9a44b389dc284aba826d15736fa8019594eff963 Mon Sep 17 00:00:00 2001
From: Jorge Esteban Quilcate Otoya
Date: Wed, 21 Jan 2026 16:55:04 +0200
Subject: [PATCH 3/5] feat(metadata:diskless): implement managed replicas for
diskless topics
When diskless.managed.rf.enable=true, new diskless topics are created with
RF=rack_count using standard KRaft replica placement instead of legacy RF=1.
Changes:
- Compute RF from rack cardinality via rackCardinality()
- Use standard replicaPlacer.place() for rack-aware assignment
- Allow manual replica assignments when managed replicas enabled
- Add checkstyle suppression for extended createTopic method
Phase 1 limitations:
- Add Partitions inherits RF from existing partitions (Phase 3)
- Transformer not updated, uses legacy routing (Phase 2)
- Integration tests deferred to Phase 2
(See #478 docs/inkless/ts-unification/DISKLESS_MANAGED_RF.md)
Conflicts:
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
---
checkstyle/suppressions.xml | 6 +
.../kafka/controller/QuorumController.java | 1 +
.../controller/ReplicationControlManager.java | 69 +-
.../ReplicationControlManagerTest.java | 778 +++++++++++++++++-
4 files changed, 837 insertions(+), 17 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index e73d2d0e9b2..0238cabda84 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -86,6 +86,9 @@
files="MemoryRecordsBuilder.java"/>
+
+
@@ -318,6 +321,9 @@
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager|KRaftMigrationDriver|ClusterControlManager|MetaPropertiesEnsemble).java"/>
+
+
classicRemoteStorageForceExcludeTopicRegexes = List.of();
@@ -201,6 +203,11 @@ public Builder setDisklessStorageSystemEnabled(boolean isDisklessStorageSystemEn
return this;
}
+ public Builder setDisklessManagedReplicasEnabled(boolean isDisklessManagedReplicasEnabled) {
+ this.isDisklessManagedReplicasEnabled = isDisklessManagedReplicasEnabled;
+ return this;
+ }
+
public Builder setClassicRemoteStorageForceEnabled(boolean classicRemoteStorageForceEnabled) {
this.classicRemoteStorageForceEnabled = classicRemoteStorageForceEnabled;
return this;
@@ -253,6 +260,7 @@ ReplicationControlManager build() {
defaultNumPartitions,
defaultDisklessEnable,
isDisklessStorageSystemEnabled,
+ isDisklessManagedReplicasEnabled,
classicRemoteStorageForceEnabled,
classicRemoteStorageForceExcludeTopicRegexes,
maxElectionsPerImbalance,
@@ -332,9 +340,21 @@ static Map translateCreationConfigs(CreatableTopicConfigCollecti
*/
private final boolean defaultDisklessEnable;
+ /**
+ * When true, the diskless storage system is enabled, allowing diskless topics to be created.
+ */
private final boolean isDisklessStorageSystemEnabled;
private final ClassicTopicRemoteStorageForcePolicy classicTopicRemoteStorageForcePolicy;
+ /**
+ * When true, diskless topics use managed replicas with RF = rack_count (one replica per rack).
+ * When false, diskless topics use legacy RF=1 behavior.
+ *
+ * Phase 1 limitation: This config only affects topic creation. Add Partitions inherits
+ * RF from existing partitions (correct behavior - maintains consistency within the topic).
+ */
+ private final boolean isDisklessManagedReplicasEnabled;
+
/**
* Maximum number of leader elections to perform during one partition leader balancing operation.
*/
@@ -425,6 +445,7 @@ private ReplicationControlManager(
int defaultNumPartitions,
boolean defaultDisklessEnable,
boolean isDisklessStorageSystemEnabled,
+ boolean isDisklessManagedReplicasEnabled,
boolean classicRemoteStorageForceEnabled,
List classicRemoteStorageForceExcludeTopicRegexes,
int maxElectionsPerImbalance,
@@ -439,6 +460,7 @@ private ReplicationControlManager(
this.defaultNumPartitions = defaultNumPartitions;
this.defaultDisklessEnable = defaultDisklessEnable;
this.isDisklessStorageSystemEnabled = isDisklessStorageSystemEnabled;
+ this.isDisklessManagedReplicasEnabled = isDisklessManagedReplicasEnabled;
this.classicTopicRemoteStorageForcePolicy = new ClassicTopicRemoteStorageForcePolicy(
classicRemoteStorageForceEnabled,
classicRemoteStorageForceExcludeTopicRegexes
@@ -803,9 +825,13 @@ private ApiError createTopic(ControllerRequestContext context,
"when the diskless storage system is disabled. " +
"Please enable the diskless storage system to create diskless topics.");
}
+ // Diskless RF validation: accept -1 (auto) or 1 (backward compat) only.
+ // Explicit RF > 1 rejected: users shouldn't need to know rack topology.
+ // Note: RF=1 is accepted for API backward compatibility, but when managed replicas
+ // are enabled, the actual RF is always computed from rack topology (rackCardinality).
if (Math.abs(topic.replicationFactor()) != 1) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
- "Replication factor for diskless topics must be 1 or -1 to use the default value (1).");
+ "Replication factor for diskless topics must be 1 or -1 (system-computed from rack topology).");
}
}
@@ -820,7 +846,7 @@ private ApiError createTopic(ControllerRequestContext context,
"A manual partition assignment was specified, but numPartitions " +
"was not set to -1.");
}
- if (disklessEnabled) {
+ if (disklessEnabled && !isDisklessManagedReplicasEnabled) {
return new ApiError(INVALID_REQUEST,
"A manual partition assignment cannot be specified for diskless topics.");
}
@@ -868,12 +894,19 @@ private ApiError createTopic(ControllerRequestContext context,
} else {
int numPartitions = topic.numPartitions() == -1 ?
defaultNumPartitions : topic.numPartitions();
- short replicationFactor = topic.replicationFactor() == -1 ?
- defaultReplicationFactor : topic.replicationFactor();
+ short classicReplicationFactor = topic.replicationFactor() == -1 ? defaultReplicationFactor : topic.replicationFactor();
+ // For managed diskless: always use rackCardinality() regardless of requested RF.
+ // RF=1 in the request is accepted for backward compat but overridden here.
+ // Throws BrokerNotAvailableException or InvalidReplicationFactorException on failure,
+ // which are caught by the caller and converted to ApiError.
+ short disklessReplicationFactor = disklessEnabled && isDisklessManagedReplicasEnabled ? rackCardinality() : 1;
+ short replicationFactor = disklessEnabled ? disklessReplicationFactor : classicReplicationFactor;
try {
TopicAssignment topicAssignment;
Predicate brokerFilter;
- if (!disklessEnabled) {
+ // Diskless managed-replicas is equivalent to classic topic assignment,
+ // but RF is defined by number of racks
+ if (!disklessEnabled || isDisklessManagedReplicasEnabled) {
topicAssignment = clusterControl.replicaPlacer().place(new PlacementSpec(
0,
numPartitions,
@@ -943,6 +976,32 @@ private ApiError createTopic(ControllerRequestContext context,
return ApiError.NONE;
}
+ /**
+ * Computes the replication factor for diskless topics based on rack topology.
+ * Returns the number of distinct racks in the cluster, ensuring one replica per rack.
+ * Brokers with no rack configured are all treated as belonging to a single logical rack,
+ * so if at least one broker is registered but none have a rack configured, the result is RF=1.
+ *
+ * @return the number of distinct racks as a short
+ * @throws BrokerNotAvailableException if no brokers are registered
+ * @throws InvalidReplicationFactorException if rack count exceeds Short.MAX_VALUE
+ */
+ private short rackCardinality() {
+ final Collection brokerRegistrations = clusterControl.brokerRegistrations().values();
+ final long racks = brokerRegistrations.stream()
+ .map(BrokerRegistration::rack)
+ .distinct()
+ .count();
+ if (racks > Short.MAX_VALUE) {
+ // Unfeasible but technically possible scenario.
+ // Would require more than 32,768 brokers and each with a different rack
+ throw new InvalidReplicationFactorException("Unexpected scenario: rack cardinality is not within short range (" + racks + "). Failing topic creation.");
+ }
+ if (racks == 0)
+ throw new BrokerNotAvailableException("No brokers available to create diskless topic.");
+ return (short) racks;
+ }
+
private boolean disklessEnabledOnTopicCreation(final Map creationConfigs) {
final String disklessEnableConfigValue = creationConfigs.get(DISKLESS_ENABLE_CONFIG);
final boolean disklessConfigEnabled;
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index aa678568d49..eb1a0f382b9 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -185,6 +185,7 @@ private static class Builder {
private final Map staticConfig = new HashMap<>();
private boolean defaultDisklessEnable = false;
private boolean disklessStorageSystemEnable = false;
+ private boolean disklessManagedReplicasEnable = false;
private boolean classicRemoteStorageForceEnabled = false;
private List classicRemoteStorageForceExcludeTopicRegexes = List.of();
@@ -223,6 +224,11 @@ Builder setDisklessStorageSystemEnabled(boolean disklessStorageSystemEnable) {
return this;
}
+ Builder setDisklessManagedReplicasEnabled(boolean disklessManagedReplicasEnable) {
+ this.disklessManagedReplicasEnable = disklessManagedReplicasEnable;
+ return this;
+ }
+
Builder setClassicRemoteStorageForceEnabled(final boolean classicRemoteStorageForceEnabled) {
this.classicRemoteStorageForceEnabled = classicRemoteStorageForceEnabled;
return this;
@@ -241,6 +247,7 @@ ReplicationControlTestContext build() {
staticConfig,
defaultDisklessEnable,
disklessStorageSystemEnable,
+ disklessManagedReplicasEnable,
classicRemoteStorageForceEnabled,
classicRemoteStorageForceExcludeTopicRegexes);
}
@@ -270,6 +277,7 @@ private ReplicationControlTestContext(
Map staticConfig,
boolean defaultDisklessEnable,
boolean disklessStorageSystemEnable,
+ boolean disklessManagedReplicasEnable,
final boolean classicRemoteStorageForceEnabled,
final List classicRemoteStorageForceExcludeTopicRegexes
) {
@@ -317,6 +325,7 @@ private ReplicationControlTestContext(
setFeatureControl(featureControl).
setDefaultDisklessEnable(defaultDisklessEnable).
setDisklessStorageSystemEnabled(disklessStorageSystemEnable).
+ setDisklessManagedReplicasEnabled(disklessManagedReplicasEnable).
setClassicRemoteStorageForceEnabled(classicRemoteStorageForceEnabled).
setClassicRemoteStorageForceExcludeTopicRegexes(classicRemoteStorageForceExcludeTopicRegexes).
build();
@@ -3786,11 +3795,11 @@ public void testNotCreateDisklessTopic(boolean logDisklessEnableServerConfig, St
}
@ParameterizedTest
- @CsvSource({
+ @CsvSource(value = {
"true,true",
- "true,",
+ "true,NULL",
"false,true",
- })
+ }, nullValues = "NULL")
public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setDefaultDisklessEnable(logDisklessEnableServerConfig)
@@ -3894,11 +3903,11 @@ public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfi
}
@ParameterizedTest
- @CsvSource({
+ @CsvSource(value = {
"true,true",
- "true,",
+ "true,NULL",
"false,true",
- })
+ }, nullValues = "NULL")
public void testCreateDisklessTopic_withRacks(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setDefaultDisklessEnable(logDisklessEnableServerConfig)
@@ -4039,13 +4048,13 @@ public void testCreateDisklessTopicWithInvalidInput(int numPartitions, short rep
}
@ParameterizedTest
- @CsvSource({
+ @CsvSource(value = {
"true,false",
- "true,"
+ "true,NULL"
// This case is not valid because no internal topic should be explicitly created with diskless enabled.
// Tested in testInvalidDisklessTopicCreationForInternalTopics
// "false,true",
- })
+ }, nullValues = "NULL")
public void testCreateInternalTopicWithDisklessEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) {
// Given a setup with diskless defined at the server level
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
@@ -4129,10 +4138,10 @@ public void testInvalidDisklessTopicCreationForInternalTopics() {
}
@ParameterizedTest
- @CsvSource({
+ @CsvSource(value = {
"false,true",
- "true,"
- })
+ "true,NULL"
+ }, nullValues = "NULL")
public void testInvalidDisklessTopicCreationWithoutSystemEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) {
// Given a setup with diskless defined at the server level
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
@@ -4481,4 +4490,749 @@ void testManualReplicaAssignmentsShouldBeRejected() {
}
}
+ @Nested
+ // Tests Diskless managed-replicas
+ class DisklessManagedReplicasTests {
+ @ParameterizedTest
+ @CsvSource(value = {
+ "false,false",
+ "false,NULL",
+ "true,false",
+ }, nullValues = "NULL")
+ public void testCreatesClassicTopicWhenDisklessDisabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDefaultDisklessEnable(logDisklessEnableServerConfig)
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ // Given a request to create a kafka topic with diskless disabled
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection();
+ if (disklessEnableTopicConfig != null) {
+ creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig()
+ .setName(DISKLESS_ENABLE_CONFIG)
+ .setValue(disklessEnableTopicConfig));
+ }
+
+ request.topics().add(new CreatableTopic().setName("foo").
+ setNumPartitions(-1).setReplicationFactor((short) -1)
+ .setConfigs(creatableTopicConfigs));
+
+ // Given all brokers unfenced
+ ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+
+ // When creating a topic with diskless disabled
+ ControllerResult result =
+ replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
+ // Then the topic creation should succeed as a classic topic with RF=3
+ CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
+ expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
+ setNumPartitions(1).setReplicationFactor((short) 3).
+ setErrorMessage(null).setErrorCode((short) 0).
+ setTopicId(result.response().topics().find("foo").topicId()));
+ assertEquals(expectedResponse, withoutConfigs(result.response()));
+ final List disklessConfigRecords = result.records().stream()
+ .filter(m -> m.message() instanceof ConfigRecord)
+ .map(m -> (ConfigRecord) m.message())
+ .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG))
+ .toList();
+ if (!disklessConfigRecords.isEmpty()) {
+ // Then always diskless is disabled
+ assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("false")));
+ }
+
+ // Given the topic is registered
+ ctx.replay(result.records());
+ assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 0}).
+ setDirectories(new Uuid[] {
+ Uuid.fromString("TESTBROKER00001DIRAAAA"),
+ Uuid.fromString("TESTBROKER00002DIRAAAA"),
+ Uuid.fromString("TESTBROKER00000DIRAAAA")
+ }).
+ setIsr(new int[] {1, 2, 0}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build(),
+ replicationControl.getPartition(
+ ((TopicRecord) result.records().get(0).message()).topicId(), 0));
+
+ // When creating a topic with diskless enabled and already exists
+ ControllerResult result1 =
+ replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
+ CreateTopicsResponseData expectedResponse1 = new CreateTopicsResponseData();
+ // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error
+ expectedResponse1.topics().add(new CreatableTopicResult().setName("foo").
+ setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).
+ setErrorMessage("Topic 'foo' already exists."));
+ assertEquals(expectedResponse1, result1.response());
+ }
+
+ @ParameterizedTest
+ @CsvSource(value = {
+ "true,true",
+ "true,NULL",
+ "false,true",
+ }, nullValues = "NULL")
+ public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDefaultDisklessEnable(logDisklessEnableServerConfig)
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ // Given a request to create a kafka topic with diskless enabled
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection();
+ if (disklessEnableTopicConfig != null) {
+ creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig()
+ .setName(DISKLESS_ENABLE_CONFIG)
+ .setValue(disklessEnableTopicConfig));
+ }
+ request.topics().add(new CreatableTopic().setName("foo").
+ setNumPartitions(-1).setReplicationFactor((short) -1)
+ .setConfigs(creatableTopicConfigs));
+
+ // When creating a topic without brokers available
+ ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
+ ControllerResult result =
+ replicationControl.createTopics(requestContext, request, Set.of("foo"));
+ // Then the topic creation should fail with BROKER_NOT_AVAILABLE error
+ CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
+ expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
+ setErrorCode(Errors.BROKER_NOT_AVAILABLE.code()).
+ setErrorMessage("No brokers available to create diskless topic."));
+ assertEquals(expectedResponse, withoutConfigs(result.response()));
+
+ // Given brokers are registered
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0);
+
+ // When creating a topic with diskless enabled
+ ControllerResult result2 =
+ replicationControl.createTopics(requestContext, request, Set.of("foo"));
+ // Then the topic creation should succeed, regardless of fenced brokers
+ CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData();
+ expectedResponse2.topics().add(new CreatableTopicResult().setName("foo").
+ setNumPartitions(1).setReplicationFactor((short) 1).
+ setErrorMessage(null).setErrorCode((short) 0).
+ setTopicId(result2.response().topics().find("foo").topicId()));
+ CreateTopicsResponseData response = result2.response();
+ assertEquals(expectedResponse2, withoutConfigs(response));
+
+ // Given all brokers unfenced
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+
+ // When creating a topic with diskless enabled
+ ControllerResult result3 =
+ replicationControl.createTopics(requestContext, request, Set.of("foo"));
+ // Then the topic creation should succeed, regardless of the RF
+ CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData();
+ expectedResponse3.topics().add(new CreatableTopicResult().setName("foo").
+ setNumPartitions(1).setReplicationFactor((short) 1).
+ setErrorMessage(null).setErrorCode((short) 0).
+ setTopicId(result3.response().topics().find("foo").topicId()));
+ assertEquals(expectedResponse3, withoutConfigs(result3.response()));
+ final List disklessConfigRecords = result3.records().stream()
+ .filter(m -> m.message() instanceof ConfigRecord)
+ .map(m -> (ConfigRecord) m.message())
+ .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG))
+ .toList();
+ assertEquals(1, disklessConfigRecords.size());
+ // Then diskless is always enabled
+ assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("true")));
+
+ // Given the topic is registered
+ ctx.replay(result3.records());
+ assertEquals(
+ new PartitionRegistration.Builder().setReplicas(new int[] {1}).
+ setDirectories(new Uuid[] {
+ Uuid.fromString("TESTBROKER00001DIRAAAA"),
+ }).
+ setIsr(new int[] {1})
+ .setLeader(1)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(0)
+ .build(),
+ replicationControl.getPartition(((TopicRecord) result3.records().get(0).message()).topicId(), 0));
+
+ // When creating a topic with diskless enabled and already exists
+ ControllerResult result4 =
+ replicationControl.createTopics(requestContext, request, Set.of("foo"));
+ CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData();
+ // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error
+ expectedResponse4.topics().add(new CreatableTopicResult().setName("foo").
+ setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).
+ setErrorMessage("Topic 'foo' already exists."));
+ assertEquals(expectedResponse4, result4.response());
+ }
+
+ @ParameterizedTest
+ @CsvSource(value = {
+ "true,true",
+ "true,NULL",
+ "false,true",
+ }, nullValues = "NULL")
+ public void testCreateDisklessTopic_withRacks(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDefaultDisklessEnable(logDisklessEnableServerConfig)
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ // Given a request to create a kafka topic with diskless enabled
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection();
+ if (disklessEnableTopicConfig != null) {
+ creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig()
+ .setName(DISKLESS_ENABLE_CONFIG)
+ .setValue(disklessEnableTopicConfig));
+ }
+ request.topics().add(new CreatableTopic().setName("foo").
+ setNumPartitions(-1).setReplicationFactor((short) -1)
+ .setConfigs(creatableTopicConfigs));
+
+ // When creating a topic without brokers available
+ ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
+ ControllerResult result =
+ replicationControl.createTopics(requestContext, request, Set.of("foo"));
+ // Then the topic creation should fail with BROKER_NOT_AVAILABLE error
+ CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
+ expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
+ setErrorCode(Errors.BROKER_NOT_AVAILABLE.code()).
+ setErrorMessage("No brokers available to create diskless topic."));
+ assertEquals(expectedResponse, withoutConfigs(result.response()));
+
+ // Given brokers are registered
+ ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c");
+ ctx.unfenceBrokers(0);
+
+ // When creating a topic with diskless enabled
+ ControllerResult result2 =
+ replicationControl.createTopics(requestContext, request, Set.of("foo"));
+ // Then the topic creation should succeed, regardless of fenced brokers
+ CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData();
+ expectedResponse2.topics().add(new CreatableTopicResult().setName("foo").
+ setNumPartitions(1).setReplicationFactor((short) 3).
+ setErrorMessage(null).setErrorCode((short) 0).
+ setTopicId(result2.response().topics().find("foo").topicId()));
+ CreateTopicsResponseData response = result2.response();
+ assertEquals(expectedResponse2, withoutConfigs(response));
+
+ // Given all brokers unfenced
+ ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c");
+ ctx.unfenceBrokers(0, 1, 2);
+
+ // When creating a topic with diskless enabled
+ ControllerResult result3 =
+ replicationControl.createTopics(requestContext, request, Set.of("foo"));
+ // Then the topic creation should succeed, regardless of the RF
+ CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData();
+ expectedResponse3.topics().add(new CreatableTopicResult().setName("foo").
+ setNumPartitions(1).setReplicationFactor((short) 3).
+ setErrorMessage(null).setErrorCode((short) 0).
+ setTopicId(result3.response().topics().find("foo").topicId()));
+ assertEquals(expectedResponse3, withoutConfigs(result3.response()));
+ final List disklessConfigRecords = result3.records().stream()
+ .filter(m -> m.message() instanceof ConfigRecord)
+ .map(m -> (ConfigRecord) m.message())
+ .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG))
+ .toList();
+ assertEquals(1, disklessConfigRecords.size());
+ // Then diskless is always enabled
+ assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("true")));
+
+ // Given the topic is registered
+ ctx.replay(result3.records());
+ assertEquals(
+ new PartitionRegistration.Builder().setReplicas(new int[] {0, 1, 2}).
+ setDirectories(new Uuid[] {
+ Uuid.fromString("TESTBROKER00000DIRAAAA"),
+ Uuid.fromString("TESTBROKER00001DIRAAAA"),
+ Uuid.fromString("TESTBROKER00002DIRAAAA"),
+ }).
+ setIsr(new int[] {0, 1, 2})
+ .setLeader(0)
+ .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
+ .setLeaderEpoch(0)
+ .setPartitionEpoch(0)
+ .build(),
+ replicationControl.getPartition(((TopicRecord) result3.records().get(0).message()).topicId(), 0));
+
+ // When creating a topic with diskless enabled and already exists
+ ControllerResult result4 =
+ replicationControl.createTopics(requestContext, request, Set.of("foo"));
+ CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData();
+ // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error
+ expectedResponse4.topics().add(new CreatableTopicResult().setName("foo").
+ setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).
+ setErrorMessage("Topic 'foo' already exists."));
+ assertEquals(expectedResponse4, result4.response());
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "1, -2, INVALID_REPLICATION_FACTOR",
+ "1, 0, INVALID_REPLICATION_FACTOR",
+ "1, 2, INVALID_REPLICATION_FACTOR",
+ "-2, 1, INVALID_PARTITIONS",
+ "0, 1, INVALID_PARTITIONS",
+ })
+ public void testCreateDisklessTopicWithInvalidInput(int numPartitions, short replicationFactor, String expectedError) {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+
+ ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
+
+ CreateTopicsRequestData.CreatableTopicConfigCollection disklessConfig =
+ new CreateTopicsRequestData.CreatableTopicConfigCollection();
+ disklessConfig.add(
+ new CreateTopicsRequestData.CreatableTopicConfig()
+ .setName("diskless.enable")
+ .setValue("true")
+ );
+
+ CreateTopicsRequestData request1 = new CreateTopicsRequestData();
+ request1.topics().add(new CreatableTopic().setName("baz")
+ .setNumPartitions(numPartitions).setReplicationFactor(replicationFactor)
+ .setConfigs(disklessConfig));
+
+ ControllerResult result1 =
+ replicationControl.createTopics(requestContext, request1, Set.of("baz"));
+ assertEquals(Errors.valueOf(expectedError).code(), result1.response().topics().find("baz").errorCode());
+ assertEquals(List.of(), result1.records());
+ }
+
+ @ParameterizedTest
+ @CsvSource(value = {
+ "true,false",
+ "true,NULL"
+ // This case is not valid because no internal topic should be explicitly created with diskless enabled.
+ // Tested in testInvalidDisklessTopicCreationForInternalTopics
+ // "false,true",
+ }, nullValues = "NULL")
+ public void testCreateInternalTopicWithDisklessEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) {
+ // Given a setup with diskless defined at the server level
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDefaultDisklessEnable(logDisklessEnableServerConfig)
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ // Given an internal kafka topic with diskless enabled
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection();
+ if (disklessEnableTopicConfig != null) {
+ // If the diskless enable config is set, it should be added to the topic configs
+ creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig()
+ .setName(DISKLESS_ENABLE_CONFIG)
+ .setValue(disklessEnableTopicConfig));
+ }
+ final String internalTopic = Topic.GROUP_METADATA_TOPIC_NAME;
+ request.topics().add(new CreatableTopic().setName(internalTopic).
+ setNumPartitions(-1).setReplicationFactor((short) -1)
+ .setConfigs(creatableTopicConfigs));
+ // Given all brokers unfenced
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+ // When creating an internal topic with diskless enabled, disable it
+ ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
+ ControllerResult result =
+ replicationControl.createTopics(requestContext, request, Set.of(internalTopic));
+ CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
+ // Then the topic creation should succeed with diskless disabled for internal topics
+ expectedResponse.topics().add(
+ new CreatableTopicResult()
+ .setName(internalTopic)
+ .setNumPartitions(1)
+ .setReplicationFactor((short) 3)
+ .setErrorMessage(null).setErrorCode((short) 0)
+ .setTopicId(result.response().topics().find(internalTopic).topicId()));
+ assertEquals(expectedResponse, withoutConfigs(result.response()));
+ assertTrue(result.response().topics().find(internalTopic)
+ .configs()
+ .stream()
+ .noneMatch(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)));
+ final List disklessConfigRecords = result.records().stream()
+ .filter(m -> m.message() instanceof ConfigRecord)
+ .map(m -> (ConfigRecord) m.message())
+ .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG))
+ .toList();
+ // Then always diskless is disabled
+ assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("false")));
+ }
+
+ @Test
+ public void testInvalidDisklessTopicCreationForInternalTopics() {
+ // Given a setup with diskless defined at the server level
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ // Given an internal kafka topic with diskless enabled
+ final String internalTopic = Topic.GROUP_METADATA_TOPIC_NAME;
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ CreateTopicsRequestData.CreatableTopicConfigCollection topicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection();
+ topicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig()
+ .setName(DISKLESS_ENABLE_CONFIG)
+ .setValue("true"));
+ request.topics().add(new CreatableTopic().setName(internalTopic).setConfigs(topicConfigs));
+ // Given all brokers unfenced
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+ // When creating an internal topic with diskless enabled, disable it
+ ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
+ ControllerResult result =
+ replicationControl.createTopics(requestContext, request, Set.of(internalTopic));
+ CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
+ // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error
+ expectedResponse.topics().add(
+ new CreatableTopicResult()
+ .setName(internalTopic)
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("Internal topics cannot be diskless topics."));
+ assertEquals(expectedResponse, withoutConfigs(result.response()));
+ }
+
+ @ParameterizedTest
+ @CsvSource(value = {
+ "false,true",
+ "true,NULL"
+ }, nullValues = "NULL")
+ public void testInvalidDisklessTopicCreationWithoutSystemEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) {
+ // Given a setup with diskless defined at the server level
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDisklessStorageSystemEnabled(false)
+ .setDefaultDisklessEnable(logDisklessEnableServerConfig)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+ // Given an internal kafka topic with diskless enabled
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ CreateTopicsRequestData.CreatableTopicConfigCollection topicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection();
+ if (disklessEnableTopicConfig != null) {
+ // If the diskless enable config is set, it should be added to the topic configs
+ topicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig()
+ .setName(DISKLESS_ENABLE_CONFIG)
+ .setValue(disklessEnableTopicConfig));
+ }
+ final String topicName = "foo";
+ request.topics().add(new CreatableTopic().setName(topicName).setConfigs(topicConfigs));
+ // Given all brokers unfenced
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+ // When creating an internal topic with diskless enabled, disable it
+ ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
+ ControllerResult result =
+ replicationControl.createTopics(requestContext, request, Set.of(topicName));
+ CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
+ // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error
+ expectedResponse.topics().add(
+ new CreatableTopicResult()
+ .setName(topicName)
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("Cannot create diskless topics when the diskless storage system is disabled. Please enable the diskless storage system to create diskless topics."));
+ assertEquals(expectedResponse, withoutConfigs(result.response()));
+ }
+
+ @Test
+ public void testReassignDisklessPartitions() {
+ MetadataVersion metadataVersion = MetadataVersion.latestTesting();
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setMetadataVersion(metadataVersion)
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1);
+ ctx.unfenceBrokers(0, 1);
+
+ String topic = "foo";
+ ctx.createTestTopic(topic, new int[][] {new int[] {0}}, Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0);
+
+ // No change in the replication factor.
+ ControllerResult alterResult1 =
+ replication.alterPartitionReassignments(
+ new AlterPartitionReassignmentsRequestData().setTopics(List.of(
+ new ReassignableTopic().setName(topic).setPartitions(List.of(
+ new ReassignablePartition().setPartitionIndex(0).setReplicas(List.of(1)))))));
+ assertEquals(new AlterPartitionReassignmentsResponseData().
+ setErrorMessage(null).setResponses(List.of(
+ new ReassignableTopicResponse().setName(topic).setPartitions(List.of(
+ new ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))),
+ alterResult1.response());
+
+ ctx.replay(alterResult1.records());
+ ListPartitionReassignmentsResponseData currentReassigning =
+ new ListPartitionReassignmentsResponseData().setErrorMessage(null).
+ setTopics(List.of(new OngoingTopicReassignment().setName(topic).setPartitions(List.of(
+ new OngoingPartitionReassignment().setPartitionIndex(0)
+ .setRemovingReplicas(List.of(0))
+ .setAddingReplicas(List.of(1))
+ .setReplicas(List.of(1, 0))))));
+ assertEquals(currentReassigning, replication.listPartitionReassignments(List.of(
+ new ListPartitionReassignmentsTopics().setName(topic).
+ setPartitionIndexes(List.of(0))), Long.MAX_VALUE));
+
+ // Try to increase the replication factor.
+ ControllerResult alterResult2 =
+ replication.alterPartitionReassignments(
+ new AlterPartitionReassignmentsRequestData().setTopics(List.of(
+ new ReassignableTopic().setName(topic).setPartitions(List.of(
+ new ReassignablePartition().setPartitionIndex(0)
+ .setReplicas(List.of(0, 1)))))));
+ assertEquals(new AlterPartitionReassignmentsResponseData()
+ .setErrorMessage(null).setResponses(List.of(
+ new ReassignableTopicResponse().setName(topic).setPartitions(List.of(
+ new ReassignablePartitionResponse().setPartitionIndex(0)
+ .setErrorCode(INVALID_REPLICATION_FACTOR.code())
+ .setErrorMessage("The replication factor is changed from 1 to 2"))))),
+ alterResult2.response());
+ ctx.replay(alterResult2.records());
+ assertEquals(currentReassigning, replication.listPartitionReassignments(List.of(
+ new ListPartitionReassignmentsTopics().setName(topic)
+ .setPartitionIndexes(List.of(0))), Long.MAX_VALUE));
+ }
+
+ @Test
+ public void testNoLeaderElectionOnBrokerFenced_noRacks() {
+ // With RF=1 (no racks), when the single replica is fenced, the leader goes offline
+ // and no new leader can be elected since there are no other replicas.
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+
+ CreatableTopicResult createResult = ctx.createTestTopic(
+ "foo",
+ new int[][] {
+ new int[] {0}
+ },
+ Map.of(DISKLESS_ENABLE_CONFIG, "true"),
+ NONE.code()
+ );
+ final Uuid topicId = createResult.topicId();
+
+ List records = new ArrayList<>();
+ replication.handleBrokerFenced(0, records);
+ ctx.replay(records);
+
+ PartitionRegistration partition = replication.getPartition(topicId, 0);
+ assertNotNull(partition, "Partition should exist after leader fencing");
+ assertArrayEquals(new int[]{0}, partition.isr, "ISR should remain unchanged as there is only one replica");
+ assertEquals(-1, partition.leader, "Leader should be offline after fencing");
+ }
+
+ @Test
+ public void testLeaderElectionOnBrokerFenced_withRacks() {
+ // With RF=3 (racks), when the leader is fenced, a new leader should be elected
+ // from the remaining replicas in other racks.
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c");
+ ctx.unfenceBrokers(0, 1, 2);
+
+ CreatableTopicResult createResult = ctx.createTestTopic(
+ "foo",
+ 1,
+ (short) 1,
+ Map.of(DISKLESS_ENABLE_CONFIG, "true"),
+ NONE.code()
+ );
+ final Uuid topicId = createResult.topicId();
+
+ PartitionRegistration partitionBefore = replication.getPartition(topicId, 0);
+ int originalLeader = partitionBefore.leader;
+
+ List records = new ArrayList<>();
+ replication.handleBrokerFenced(originalLeader, records);
+ ctx.replay(records);
+
+ PartitionRegistration partition = replication.getPartition(topicId, 0);
+ assertNotNull(partition, "Partition should exist after leader fencing");
+ assertEquals(3, partition.replicas.length, "Replicas should remain unchanged (RF=3)");
+ assertEquals(2, partition.isr.length, "ISR should shrink by 1 after fencing the leader");
+ assertNotEquals(originalLeader, partition.leader, "Leader should change after fencing");
+ assertTrue(partition.leader >= 0, "A new leader should be elected from remaining replicas");
+ }
+
+ @Test
+ public void testNoReplicaChangeOnShutdown_noRacks() {
+ // With RF=1 (no racks), when the single replica is shutdown, the leader goes offline
+ // and no new leader can be elected since there are no other replicas.
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+
+ CreatableTopicResult createResult = ctx.createTestTopic(
+ "foo",
+ new int[][] {
+ new int[] {0}
+ },
+ Map.of(DISKLESS_ENABLE_CONFIG, "true"),
+ NONE.code()
+ );
+ final Uuid topicId = createResult.topicId();
+
+ List records = new ArrayList<>();
+ replication.handleBrokerShutdown(0, true, records);
+ ctx.replay(records);
+
+ PartitionRegistration partition = replication.getPartition(topicId, 0);
+ assertNotNull(partition, "Partition should exist after leader shutdown");
+ assertArrayEquals(new int[]{0}, partition.isr, "ISR should remain unchanged as there is only one replica");
+ assertEquals(-1, partition.leader, "Leader should be offline after shutdown");
+ }
+
+ @Test
+ public void testReplicaChangeOnShutdown_withRacks() {
+ // With RF=3 (racks), when the leader broker is shutdown, the ISR shrinks and a new leader
+ // is elected from the remaining replicas in other racks.
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c");
+ ctx.unfenceBrokers(0, 1, 2);
+
+ CreatableTopicResult createResult = ctx.createTestTopic(
+ "foo",
+ 1,
+ (short) 1,
+ Map.of(DISKLESS_ENABLE_CONFIG, "true"),
+ NONE.code()
+ );
+ final Uuid topicId = createResult.topicId();
+
+ // Capture the actual leader before shutdown
+ PartitionRegistration initialPartition = replication.getPartition(topicId, 0);
+ int originalLeader = initialPartition.leader;
+
+ List records = new ArrayList<>();
+ replication.handleBrokerShutdown(originalLeader, true, records);
+ ctx.replay(records);
+
+ PartitionRegistration partition = replication.getPartition(topicId, 0);
+ assertNotNull(partition, "Partition should exist after broker shutdown");
+ assertEquals(3, partition.replicas.length, "Replicas should remain unchanged (RF=3)");
+ assertEquals(2, partition.isr.length, "ISR should shrink by 1 after shutdown");
+ assertTrue(partition.leader >= 0, "A new leader should be elected from remaining replicas");
+ assertNotEquals(originalLeader, partition.leader, "Leader should change after shutdown");
+ }
+
+ @Test
+ void testDisklessMarksLeaderOfflineOnUnregister_noRacks() {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+
+ final int numPartitions = 6;
+ CreatableTopicResult createResult = ctx.createTestTopic(
+ "foo",
+ numPartitions,
+ (short) 1,
+ Map.of(DISKLESS_ENABLE_CONFIG, "true"),
+ NONE.code()
+ );
+ final Uuid topicId = createResult.topicId();
+
+ List records = new ArrayList<>();
+ replication.handleBrokerUnregistered(0, 100, records);
+ ctx.replay(records);
+
+ // All partitions should remain present and keep the original replica/ISR,
+ // only the leader should be marked offline if placed on the unregistered broker.
+ for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+ PartitionRegistration partition = replication.getPartition(topicId, partitionId);
+ assertNotNull(partition, "Partition " + partitionId + " should exist after broker unregistration");
+ assertEquals(1, partition.replicas.length, "Replicas [" + Arrays.toString(partition.replicas) + "] should stay unchanged for partition " + partitionId);
+ assertEquals(1, partition.isr.length, "ISR [" + Arrays.toString(partition.isr) + "] should stay unchanged for partition " + partitionId);
+ if (partition.preferredReplica() == 0) {
+ assertEquals(-1, partition.leader, "Leader should be offline for partition " + partitionId);
+ } else {
+ assertTrue(partition.leader > 0, "Leader should be online for partition " + partitionId);
+ }
+ }
+ // Sticking to keep partitions offline, as availability is managed by the Diskless metadata transformation
+ // with a fallback to "any node available"; not the KRaft registered metadata.
+ // Replicas will be reported as offline, so operators are aware of the underprovisioning, and can act on it.
+ // If they need to move the replicas, they can do that using regular tooling.
+ }
+
+ @Test
+ void testDisklessMarksLeaderOfflineOnUnregister_withRacks() {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c");
+ ctx.unfenceBrokers(0, 1, 2);
+
+ final int numPartitions = 6;
+ CreatableTopicResult createResult = ctx.createTestTopic(
+ "foo",
+ numPartitions,
+ (short) 1,
+ Map.of(DISKLESS_ENABLE_CONFIG, "true"),
+ NONE.code()
+ );
+ final Uuid topicId = createResult.topicId();
+
+ List records = new ArrayList<>();
+ replication.handleBrokerUnregistered(0, 100, records);
+ ctx.replay(records);
+
+ // All partitions should remain present with original replicas.
+ // ISR should shrink and leaders should move to other brokers.
+ for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+ PartitionRegistration partition = replication.getPartition(topicId, partitionId);
+ assertNotNull(partition, "Partition " + partitionId + " should exist after broker unregistration");
+ assertEquals(3, partition.replicas.length, "Replicas should stay unchanged for partition " + partitionId);
+ assertEquals(2, partition.isr.length, "ISR should shrink to 2 for partition " + partitionId);
+ assertTrue(partition.leader > 0, "Leader should remain online for partition " + partitionId);
+ }
+ }
+
+ @Test
+ void testManualReplicaAssignmentsShouldBeAllowed() {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+
+ // Expectation: providing manual replica assignments for a diskless topic with managed-replicas should be allowed.
+ ctx.createTestTopic(
+ "foo",
+ new int[][] {new int[] {0, 1}, new int[] {1, 2}},
+ Map.of(DISKLESS_ENABLE_CONFIG, "true"),
+ NONE.code()
+ );
+ }
+ }
}
From 68480d4fbffe18511dc151d730919d371bc6d59d Mon Sep 17 00:00:00 2001
From: Jorge Esteban Quilcate Otoya
Date: Mon, 9 Mar 2026 13:42:56 -0500
Subject: [PATCH 4/5] fixup! feat(controller:diskless): add server config for
managed replicas
---
.../apache/kafka/server/config/ServerConfigs.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java
index 6970984e5c6..9a69ba10d01 100644
--- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java
+++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java
@@ -142,16 +142,16 @@ public class ServerConfigs {
"This should only be enabled in non-production environments for testing or migration purposes. " +
"When enabled, topics can have their diskless.enable config changed from false to true.";
- // When enabled, diskless topics are created with RF = rack_count (one replica per rack).
- // If brokers are registered but none have a rack configured, RF defaults to 1.
- // If no brokers are registered, topic creation fails with BROKER_NOT_AVAILABLE.
- // When disabled (default), diskless topics use legacy RF=1 behavior.
+ // When enabled, diskless topics are created with user-defined replication factor.
+ // RF=-1 resolves to default.replication.factor. Explicit RF values (1, 2, 3, ...) are accepted.
+ // Placement uses standard rack-aware assignment.
+ // When disabled (default), diskless topics use legacy RF=1 behavior (RF=-1 resolves to 1, RF > 1 rejected).
// This config only affects topic creation.
public static final String DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG = "diskless.managed.rf.enable";
public static final boolean DISKLESS_MANAGED_REPLICAS_ENABLE_DEFAULT = false;
public static final String DISKLESS_MANAGED_REPLICAS_ENABLE_DOC = "When enabled, new diskless topics are created " +
- "with replication factor equal to the number of racks (one replica per rack). " +
- "If brokers are registered but none have a rack configured, RF defaults to 1. " +
+ "with user-defined replication factor. RF=-1 resolves to default.replication.factor. " +
+ "Explicit RF values are accepted. Placement uses standard rack-aware assignment. " +
"When disabled, diskless topics use legacy RF=1 behavior. " +
"This config only affects topic creation.";
From aed14b5449231cc66c1eb033acbc08fb164f120f Mon Sep 17 00:00:00 2001
From: Jorge Esteban Quilcate Otoya
Date: Mon, 9 Mar 2026 13:43:38 -0500
Subject: [PATCH 5/5] fixup! feat(metadata:diskless): implement managed
replicas for diskless topics
---
.../controller/ReplicationControlManager.java | 50 ++-----
.../ReplicationControlManagerTest.java | 141 ++++++++++++++----
2 files changed, 119 insertions(+), 72 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 9c4a37595b0..1990b68b7e5 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -25,7 +25,6 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
-import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
@@ -825,13 +824,12 @@ private ApiError createTopic(ControllerRequestContext context,
"when the diskless storage system is disabled. " +
"Please enable the diskless storage system to create diskless topics.");
}
- // Diskless RF validation: accept -1 (auto) or 1 (backward compat) only.
- // Explicit RF > 1 rejected: users shouldn't need to know rack topology.
- // Note: RF=1 is accepted for API backward compatibility, but when managed replicas
- // are enabled, the actual RF is always computed from rack topology (rackCardinality).
- if (Math.abs(topic.replicationFactor()) != 1) {
+ // Diskless RF validation:
+ // When managed replicas enabled: any valid RF accepted (standard Kafka validation applies later).
+ // When managed replicas disabled (legacy): only RF=1 or RF=-1 (resolves to 1).
+ if (!isDisklessManagedReplicasEnabled && Math.abs(topic.replicationFactor()) != 1) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
- "Replication factor for diskless topics must be 1 or -1 (system-computed from rack topology).");
+ "Replication factor for diskless topics must be 1 or -1 when managed replicas are disabled.");
}
}
@@ -895,17 +893,15 @@ private ApiError createTopic(ControllerRequestContext context,
int numPartitions = topic.numPartitions() == -1 ?
defaultNumPartitions : topic.numPartitions();
short classicReplicationFactor = topic.replicationFactor() == -1 ? defaultReplicationFactor : topic.replicationFactor();
- // For managed diskless: always use rackCardinality() regardless of requested RF.
- // RF=1 in the request is accepted for backward compat but overridden here.
- // Throws BrokerNotAvailableException or InvalidReplicationFactorException on failure,
- // which are caught by the caller and converted to ApiError.
- short disklessReplicationFactor = disklessEnabled && isDisklessManagedReplicasEnabled ? rackCardinality() : 1;
+ // For managed diskless: use same resolution as classic (RF=-1 → defaultReplicationFactor, else user value).
+ // For unmanaged diskless (legacy): always RF=1.
+ short disklessReplicationFactor = isDisklessManagedReplicasEnabled ? classicReplicationFactor : 1;
short replicationFactor = disklessEnabled ? disklessReplicationFactor : classicReplicationFactor;
try {
TopicAssignment topicAssignment;
Predicate brokerFilter;
- // Diskless managed-replicas is equivalent to classic topic assignment,
- // but RF is defined by number of racks
+ // Diskless managed-replicas uses standard rack-aware assignment
+ // with user-defined RF (or defaultReplicationFactor if RF=-1)
if (!disklessEnabled || isDisklessManagedReplicasEnabled) {
topicAssignment = clusterControl.replicaPlacer().place(new PlacementSpec(
0,
@@ -976,32 +972,6 @@ private ApiError createTopic(ControllerRequestContext context,
return ApiError.NONE;
}
- /**
- * Computes the replication factor for diskless topics based on rack topology.
- * Returns the number of distinct racks in the cluster, ensuring one replica per rack.
- * Brokers with no rack configured are all treated as belonging to a single logical rack,
- * so if at least one broker is registered but none have a rack configured, the result is RF=1.
- *
- * @return the number of distinct racks as a short
- * @throws BrokerNotAvailableException if no brokers are registered
- * @throws InvalidReplicationFactorException if rack count exceeds Short.MAX_VALUE
- */
- private short rackCardinality() {
- final Collection brokerRegistrations = clusterControl.brokerRegistrations().values();
- final long racks = brokerRegistrations.stream()
- .map(BrokerRegistration::rack)
- .distinct()
- .count();
- if (racks > Short.MAX_VALUE) {
- // Unfeasible but technically possible scenario.
- // Would require more than 32,768 brokers and each with a different rack
- throw new InvalidReplicationFactorException("Unexpected scenario: rack cardinality is not within short range (" + racks + "). Failing topic creation.");
- }
- if (racks == 0)
- throw new BrokerNotAvailableException("No brokers available to create diskless topic.");
- return (short) racks;
- }
-
private boolean disklessEnabledOnTopicCreation(final Map creationConfigs) {
final String disklessEnableConfigValue = creationConfigs.get(DISKLESS_ENABLE_CONFIG);
final boolean disklessConfigEnabled;
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index eb1a0f382b9..db543330445 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -4596,12 +4596,10 @@ public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfi
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult result =
replicationControl.createTopics(requestContext, request, Set.of("foo"));
- // Then the topic creation should fail with BROKER_NOT_AVAILABLE error
- CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
- expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
- setErrorCode(Errors.BROKER_NOT_AVAILABLE.code()).
- setErrorMessage("No brokers available to create diskless topic."));
- assertEquals(expectedResponse, withoutConfigs(result.response()));
+ // Then the topic creation should fail with INVALID_REPLICATION_FACTOR
+ // (standard Kafka behavior when no brokers can satisfy the requested RF)
+ assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+ result.response().topics().find("foo").errorCode());
// Given brokers are registered
ctx.registerBrokers(0, 1, 2);
@@ -4610,10 +4608,11 @@ public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfi
// When creating a topic with diskless enabled
ControllerResult result2 =
replicationControl.createTopics(requestContext, request, Set.of("foo"));
- // Then the topic creation should succeed, regardless of fenced brokers
+ // Then the topic creation should succeed with RF=3 (default.replication.factor),
+ // regardless of fenced brokers
CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData();
expectedResponse2.topics().add(new CreatableTopicResult().setName("foo").
- setNumPartitions(1).setReplicationFactor((short) 1).
+ setNumPartitions(1).setReplicationFactor((short) 3).
setErrorMessage(null).setErrorCode((short) 0).
setTopicId(result2.response().topics().find("foo").topicId()));
CreateTopicsResponseData response = result2.response();
@@ -4626,10 +4625,10 @@ public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfi
// When creating a topic with diskless enabled
ControllerResult result3 =
replicationControl.createTopics(requestContext, request, Set.of("foo"));
- // Then the topic creation should succeed, regardless of the RF
+ // Then the topic creation should succeed with RF=3 (default.replication.factor)
CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData();
expectedResponse3.topics().add(new CreatableTopicResult().setName("foo").
- setNumPartitions(1).setReplicationFactor((short) 1).
+ setNumPartitions(1).setReplicationFactor((short) 3).
setErrorMessage(null).setErrorCode((short) 0).
setTopicId(result3.response().topics().find("foo").topicId()));
assertEquals(expectedResponse3, withoutConfigs(result3.response()));
@@ -4644,18 +4643,11 @@ public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfi
// Given the topic is registered
ctx.replay(result3.records());
- assertEquals(
- new PartitionRegistration.Builder().setReplicas(new int[] {1}).
- setDirectories(new Uuid[] {
- Uuid.fromString("TESTBROKER00001DIRAAAA"),
- }).
- setIsr(new int[] {1})
- .setLeader(1)
- .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
- .setLeaderEpoch(0)
- .setPartitionEpoch(0)
- .build(),
- replicationControl.getPartition(((TopicRecord) result3.records().get(0).message()).topicId(), 0));
+ PartitionRegistration partition = replicationControl.getPartition(
+ ((TopicRecord) result3.records().get(0).message()).topicId(), 0);
+ assertEquals(3, partition.replicas.length, "RF should be 3 (default.replication.factor)");
+ assertEquals(3, partition.isr.length, "All brokers are active so ISR should equal replicas");
+ assertTrue(partition.leader >= 0, "Leader should be elected");
// When creating a topic with diskless enabled and already exists
ControllerResult result4 =
@@ -4697,12 +4689,10 @@ public void testCreateDisklessTopic_withRacks(boolean logDisklessEnableServerCon
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult result =
replicationControl.createTopics(requestContext, request, Set.of("foo"));
- // Then the topic creation should fail with BROKER_NOT_AVAILABLE error
- CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
- expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
- setErrorCode(Errors.BROKER_NOT_AVAILABLE.code()).
- setErrorMessage("No brokers available to create diskless topic."));
- assertEquals(expectedResponse, withoutConfigs(result.response()));
+ // Then the topic creation should fail with INVALID_REPLICATION_FACTOR
+ // (standard Kafka behavior when no brokers can satisfy the requested RF)
+ assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+ result.response().topics().find("foo").errorCode());
// Given brokers are registered
ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c");
@@ -4775,7 +4765,6 @@ public void testCreateDisklessTopic_withRacks(boolean logDisklessEnableServerCon
@CsvSource({
"1, -2, INVALID_REPLICATION_FACTOR",
"1, 0, INVALID_REPLICATION_FACTOR",
- "1, 2, INVALID_REPLICATION_FACTOR",
"-2, 1, INVALID_PARTITIONS",
"0, 1, INVALID_PARTITIONS",
})
@@ -5049,7 +5038,7 @@ public void testLeaderElectionOnBrokerFenced_withRacks() {
CreatableTopicResult createResult = ctx.createTestTopic(
"foo",
1,
- (short) 1,
+ (short) 3,
Map.of(DISKLESS_ENABLE_CONFIG, "true"),
NONE.code()
);
@@ -5117,7 +5106,7 @@ public void testReplicaChangeOnShutdown_withRacks() {
CreatableTopicResult createResult = ctx.createTestTopic(
"foo",
1,
- (short) 1,
+ (short) 3,
Map.of(DISKLESS_ENABLE_CONFIG, "true"),
NONE.code()
);
@@ -5196,7 +5185,7 @@ void testDisklessMarksLeaderOfflineOnUnregister_withRacks() {
CreatableTopicResult createResult = ctx.createTestTopic(
"foo",
numPartitions,
- (short) 1,
+ (short) 3,
Map.of(DISKLESS_ENABLE_CONFIG, "true"),
NONE.code()
);
@@ -5234,5 +5223,93 @@ void testManualReplicaAssignmentsShouldBeAllowed() {
NONE.code()
);
}
+
+ @Test
+ void testCreateDisklessTopicWithExplicitRF() {
+ // Verify that explicit RF=2 is honored (not overridden to rack count or rejected).
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c");
+ ctx.unfenceBrokers(0, 1, 2);
+
+ ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ request.topics().add(new CreatableTopic().setName("foo")
+ .setNumPartitions(2).setReplicationFactor((short) 2)
+ .setConfigs(new CreateTopicsRequestData.CreatableTopicConfigCollection(List.of(
+ new CreateTopicsRequestData.CreatableTopicConfig()
+ .setName(DISKLESS_ENABLE_CONFIG)
+ .setValue("true")
+ ).iterator())));
+
+ ControllerResult result =
+ replication.createTopics(requestContext, request, Set.of("foo"));
+ assertEquals(NONE.code(), result.response().topics().find("foo").errorCode());
+ assertEquals(2, result.response().topics().find("foo").replicationFactor());
+ assertEquals(2, result.response().topics().find("foo").numPartitions());
+
+ ctx.replay(result.records());
+ PartitionRegistration partition = replication.getPartition(
+ ((TopicRecord) result.records().get(0).message()).topicId(), 0);
+ assertEquals(2, partition.replicas.length, "RF=2 should be honored");
+ assertEquals(2, partition.isr.length, "All replicas should be in ISR");
+ }
+
+ @Test
+ void testCreateDisklessTopicWithRFExceedingBrokerCount() {
+ // Verify that RF > broker count fails with standard Kafka error.
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+
+ ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ request.topics().add(new CreatableTopic().setName("foo")
+ .setNumPartitions(1).setReplicationFactor((short) 5)
+ .setConfigs(new CreateTopicsRequestData.CreatableTopicConfigCollection(List.of(
+ new CreateTopicsRequestData.CreatableTopicConfig()
+ .setName(DISKLESS_ENABLE_CONFIG)
+ .setValue("true")
+ ).iterator())));
+
+ ControllerResult result =
+ replication.createTopics(requestContext, request, Set.of("foo"));
+ assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+ result.response().topics().find("foo").errorCode());
+ }
+
+ @Test
+ void testCreateDisklessTopicWithRFGreaterThanOneRejectedWhenManagedDisabled() {
+ // When managed replicas is disabled, RF > 1 should be rejected.
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(false)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+
+ ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
+ CreateTopicsRequestData request = new CreateTopicsRequestData();
+ request.topics().add(new CreatableTopic().setName("foo")
+ .setNumPartitions(1).setReplicationFactor((short) 2)
+ .setConfigs(new CreateTopicsRequestData.CreatableTopicConfigCollection(List.of(
+ new CreateTopicsRequestData.CreatableTopicConfig()
+ .setName(DISKLESS_ENABLE_CONFIG)
+ .setValue("true")
+ ).iterator())));
+
+ ControllerResult result =
+ replication.createTopics(requestContext, request, Set.of("foo"));
+ assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+ result.response().topics().find("foo").errorCode());
+ }
}
}