From 9a37b242d5ffa4b0eab1f9326f7f4dcb798d675d Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 13 Mar 2026 13:40:08 -0500 Subject: [PATCH 1/3] feat(controller:diskless): add partitions support for diskless topics Add partitions now works correctly for diskless topics: - Auto-placement inherits RF from existing partitions and uses standard ReplicaPlacer (works for both managed and unmanaged) - Manual assignments are allowed for managed-RF diskless topics - Manual assignments are rejected for unmanaged diskless topics, matching the createTopics restriction - ISR for new partitions correctly filters to active brokers only --- .../controller/ReplicationControlManager.java | 4 + .../ReplicationControlManagerTest.java | 170 ++++++++++++++++++ 2 files changed, 174 insertions(+) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 4498075ea6..b03d8a828e 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -2253,6 +2253,10 @@ void createPartitions(ControllerRequestContext context, " additional partition(s), but only " + topic.assignments().size() + " assignment(s) were specified."); } + if (isDisklessTopic(topic.name()) && !isDisklessManagedReplicasEnabled) { + throw new InvalidReplicaAssignmentException( + "A manual partition assignment cannot be specified for diskless topics."); + } } try { context.applyPartitionChangeQuota(additional); // check controller mutation quota diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 3c3f4bb156..65fdbae8cf 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -4525,6 +4525,69 @@ void testManualReplicaAssignmentsShouldBeRejected() { INVALID_REQUEST.code() ); } + + @Test + public void testAddPartitionsAutoPlacement() { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .setDisklessStorageSystemEnabled(true) + .build(); + + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1); + ctx.unfenceBrokers(0, 1); + + // Create a diskless topic with RF=1 (unmanaged), 1 partition + String topic = "foo"; + CreatableTopicResult createResult = ctx.createTestTopic( + topic, 1, (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), NONE.code()); + + // Add 2 more partitions (auto-placement, no manual assignments) + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS); + ControllerResult> addResult = + replication.createPartitions(requestContext, List.of( + new CreatePartitionsTopic().setName(topic).setCount(3).setAssignments(null))); + assertEquals(NONE.code(), addResult.response().get(0).errorCode()); + ctx.replay(addResult.records()); + + // Verify new partitions have RF=1 (inherited from existing partition) + for (int p = 0; p < 3; p++) { + PartitionRegistration partition = replication.getPartition(createResult.topicId(), p); + assertNotNull(partition, "Partition " + p + " should exist"); + assertEquals(1, partition.replicas.length, + "Partition " + p + " should have RF=1"); + } + } + + @Test + public void testAddPartitionsManualAssignmentRejectedForUnmanaged() { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .setDisklessStorageSystemEnabled(true) + .build(); + + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1); + ctx.unfenceBrokers(0, 1); + + // Create a diskless topic with RF=1 (unmanaged), 1 partition + String topic = "foo"; + ctx.createTestTopic(topic, 1, (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), NONE.code()); + + // Try to add 1 partition with manual assignment — should be rejected + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS); + ControllerResult> addResult = + replication.createPartitions(requestContext, List.of( + new CreatePartitionsTopic().setName(topic).setCount(2).setAssignments(List.of( + new CreatePartitionsAssignment().setBrokerIds(List.of(1)))))); + assertEquals(INVALID_REPLICA_ASSIGNMENT.code(), addResult.response().get(0).errorCode()); + assertEquals("A manual partition assignment cannot be specified for diskless topics.", + addResult.response().get(0).errorMessage()); + } } @Nested @@ -5142,6 +5205,113 @@ public void testPeriodicLeaderBalancingSkipsDisklessTopics() { "Periodic leader balancing should skip diskless topics"); } + @Test + public void testAddPartitionsAutoPlacement() { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // Create a diskless topic with RF=2, 1 partition + String topic = "foo"; + CreatableTopicResult createResult = ctx.createTestTopic(topic, new int[][] {new int[] {0, 1}}, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0); + + // Add 2 more partitions (auto-placement, no manual assignments) + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS); + ControllerResult> addResult = + replication.createPartitions(requestContext, List.of( + new CreatePartitionsTopic().setName(topic).setCount(3).setAssignments(null))); + assertEquals(NONE.code(), addResult.response().get(0).errorCode()); + ctx.replay(addResult.records()); + + // Verify new partitions have RF=2 (inherited from existing partitions) + for (int p = 0; p < 3; p++) { + PartitionRegistration partition = replication.getPartition(createResult.topicId(), p); + assertNotNull(partition, "Partition " + p + " should exist"); + assertEquals(2, partition.replicas.length, + "Partition " + p + " should have RF=2"); + assertTrue(partition.isr.length > 0, + "Partition " + p + " should have non-empty ISR"); + } + } + + @Test + public void testAddPartitionsManualAssignment() { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // Create a diskless topic with RF=2, 1 partition + String topic = "foo"; + CreatableTopicResult createResult = ctx.createTestTopic(topic, new int[][] {new int[] {0, 1}}, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0); + + // Add 1 partition with manual assignment + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS); + ControllerResult> addResult = + replication.createPartitions(requestContext, List.of( + new CreatePartitionsTopic().setName(topic).setCount(2).setAssignments(List.of( + new CreatePartitionsAssignment().setBrokerIds(List.of(1, 2)))))); + assertEquals(NONE.code(), addResult.response().get(0).errorCode()); + ctx.replay(addResult.records()); + + // Verify new partition has the specified replicas + PartitionRegistration partition = replication.getPartition(createResult.topicId(), 1); + assertEquals(List.of(1, 2), Replicas.toList(partition.replicas)); + // ISR should include only active brokers (both are active) + assertEquals(List.of(1, 2), Replicas.toList(partition.isr)); + } + + @Test + public void testAddPartitionsIsrExcludesFencedBrokers() { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // Create a diskless topic with RF=2, 1 partition + String topic = "foo"; + CreatableTopicResult createResult = ctx.createTestTopic(topic, new int[][] {new int[] {0, 1}}, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0); + + // Fence broker 2 + ctx.fenceBrokers(2); + + // Add 1 partition with manual assignment including fenced broker + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS); + ControllerResult> addResult = + replication.createPartitions(requestContext, List.of( + new CreatePartitionsTopic().setName(topic).setCount(2).setAssignments(List.of( + new CreatePartitionsAssignment().setBrokerIds(List.of(1, 2)))))); + assertEquals(NONE.code(), addResult.response().get(0).errorCode()); + ctx.replay(addResult.records()); + + // Replicas include both, but ISR only includes the active broker + PartitionRegistration partition = replication.getPartition(createResult.topicId(), 1); + assertEquals(List.of(1, 2), Replicas.toList(partition.replicas)); + assertEquals(List.of(1), Replicas.toList(partition.isr)); + } + @Test public void testNoLeaderElectionOnBrokerFenced_noRacks() { // With RF=1 (no racks), when the single replica is fenced, the leader goes offline From a8fe232fcb129322373684296e9b2af01f8e4a74 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Mon, 16 Mar 2026 11:34:45 -0500 Subject: [PATCH 2/3] refactor(controller:diskless): handle add partition for unmanaged replicas Currently adding partitions will run a replica placer which is not aligned with the logic on topic creation. Covering this properly to have separate logic between unmanaged and managed diskless replicas. --- .../controller/ReplicationControlManager.java | 16 ++++--- .../ReplicationControlManagerTest.java | 44 +++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index b03d8a828e..ffbcecc687 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -349,11 +349,9 @@ static Map translateCreationConfigs(CreatableTopicConfigCollecti private final ClassicTopicRemoteStorageForcePolicy classicTopicRemoteStorageForcePolicy; /** - * When true, diskless topics use managed replicas with RF = rack_count (one replica per rack). + * When true, diskless topics use managed replicas with user-defined RF + * (or {@code default.replication.factor} when RF=-1). * When false, diskless topics use legacy RF=1 behavior. - * - *

Phase 1 limitation: This config only affects topic creation. Add Partitions inherits - * RF from existing partitions (correct behavior - maintains consistency within the topic). */ private final boolean isDisklessManagedReplicasEnabled; @@ -2306,11 +2304,19 @@ void createPartitions(ControllerRequestContext context, ).assignments(); isrs = partitionAssignments.stream().map(PartitionAssignment::replicas).toList(); } + // For unmanaged diskless, ISR includes all replicas regardless of fenced state — + // consistent with topic creation. Data lives in object storage, so broker fencing + // doesn't affect data availability. + boolean isDiskless = isDisklessTopic(topic.name()); + Predicate brokerFilter = (isDiskless && !isDisklessManagedReplicasEnabled) + ? x -> true + : clusterControl::isActive; + int partitionId = startPartitionId; for (int i = 0; i < partitionAssignments.size(); i++) { PartitionAssignment partitionAssignment = partitionAssignments.get(i); List isr = isrs.get(i).stream(). - filter(clusterControl::isActive).toList(); + filter(brokerFilter).toList(); // If the ISR is empty, it means that all brokers are fenced or // in controlled shutdown. To be consistent with the replica placer, // we reject the create topic request with INVALID_REPLICATION_FACTOR. diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 65fdbae8cf..e162899b3b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -4558,6 +4558,50 @@ public void testAddPartitionsAutoPlacement() { assertNotNull(partition, "Partition " + p + " should exist"); assertEquals(1, partition.replicas.length, "Partition " + p + " should have RF=1"); + assertTrue(partition.leader >= 0, + "Partition " + p + " should have a valid leader"); + } + } + + @Test + public void testAddPartitionsWithFencedBroker() { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .setDisklessStorageSystemEnabled(true) + .build(); + + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // Create a diskless topic with RF=1 (unmanaged), 1 partition + String topic = "foo"; + CreatableTopicResult createResult = ctx.createTestTopic( + topic, 1, (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), NONE.code()); + + // Fence broker 2 + ctx.fenceBrokers(2); + + // Add 2 more partitions — should succeed, new partitions placed on unfenced brokers + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS); + ControllerResult> addResult = + replication.createPartitions(requestContext, List.of( + new CreatePartitionsTopic().setName(topic).setCount(3).setAssignments(null))); + assertEquals(NONE.code(), addResult.response().get(0).errorCode()); + ctx.replay(addResult.records()); + + // Verify new partitions are placed on unfenced brokers + for (int p = 1; p < 3; p++) { + PartitionRegistration partition = replication.getPartition(createResult.topicId(), p); + assertNotNull(partition, "Partition " + p + " should exist"); + assertEquals(1, partition.replicas.length, + "Partition " + p + " should have RF=1"); + assertNotEquals(2, partition.replicas[0], + "Partition " + p + " should not be placed on fenced broker"); + assertTrue(partition.leader >= 0, + "Partition " + p + " should have a valid leader"); } } From d07f12410868d2bea86206c2085274bd48fc0033 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Mon, 16 Mar 2026 11:42:13 -0500 Subject: [PATCH 3/3] fix(config:diskless): update managed replicas config doc to reflect add-partitions support The config comment and doc string said "only affects topic creation" but add-partitions now also respects the managed replicas setting. --- .../java/org/apache/kafka/server/config/ServerConfigs.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index 9a69ba10d0..532ed0b0a5 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -146,14 +146,13 @@ public class ServerConfigs { // RF=-1 resolves to default.replication.factor. Explicit RF values (1, 2, 3, ...) are accepted. // Placement uses standard rack-aware assignment. // When disabled (default), diskless topics use legacy RF=1 behavior (RF=-1 resolves to 1, RF > 1 rejected). - // This config only affects topic creation. + // This config affects topic creation and add-partitions (manual assignments allowed when enabled). public static final String DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG = "diskless.managed.rf.enable"; public static final boolean DISKLESS_MANAGED_REPLICAS_ENABLE_DEFAULT = false; public static final String DISKLESS_MANAGED_REPLICAS_ENABLE_DOC = "When enabled, new diskless topics are created " + "with user-defined replication factor. RF=-1 resolves to default.replication.factor. " + "Explicit RF values are accepted. Placement uses standard rack-aware assignment. " + - "When disabled, diskless topics use legacy RF=1 behavior. " + - "This config only affects topic creation."; + "When disabled, diskless topics use legacy RF=1 behavior."; public static final String CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG = "classic.remote.storage.force.enable"; public static final boolean CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_DEFAULT = false;