From 9a37b242d5ffa4b0eab1f9326f7f4dcb798d675d Mon Sep 17 00:00:00 2001
From: Jorge Esteban Quilcate Otoya Phase 1 limitation: This config only affects topic creation. Add Partitions inherits
- * RF from existing partitions (correct behavior - maintains consistency within the topic).
*/
private final boolean isDisklessManagedReplicasEnabled;
@@ -2306,11 +2304,19 @@ void createPartitions(ControllerRequestContext context,
).assignments();
isrs = partitionAssignments.stream().map(PartitionAssignment::replicas).toList();
}
+ // For unmanaged diskless, ISR includes all replicas regardless of fenced state —
+ // consistent with topic creation. Data lives in object storage, so broker fencing
+ // doesn't affect data availability.
+ boolean isDiskless = isDisklessTopic(topic.name());
+ Predicate> addResult =
+ replication.createPartitions(requestContext, List.of(
+ new CreatePartitionsTopic().setName(topic).setCount(3).setAssignments(null)));
+ assertEquals(NONE.code(), addResult.response().get(0).errorCode());
+ ctx.replay(addResult.records());
+
+ // Verify new partitions have RF=1 (inherited from existing partition)
+ for (int p = 0; p < 3; p++) {
+ PartitionRegistration partition = replication.getPartition(createResult.topicId(), p);
+ assertNotNull(partition, "Partition " + p + " should exist");
+ assertEquals(1, partition.replicas.length,
+ "Partition " + p + " should have RF=1");
+ }
+ }
+
+ @Test
+ public void testAddPartitionsManualAssignmentRejectedForUnmanaged() {
+ MetadataVersion metadataVersion = MetadataVersion.latestTesting();
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setMetadataVersion(metadataVersion)
+ .setDisklessStorageSystemEnabled(true)
+ .build();
+
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1);
+ ctx.unfenceBrokers(0, 1);
+
+ // Create a diskless topic with RF=1 (unmanaged), 1 partition
+ String topic = "foo";
+ ctx.createTestTopic(topic, 1, (short) 1,
+ Map.of(DISKLESS_ENABLE_CONFIG, "true"), NONE.code());
+
+ // Try to add 1 partition with manual assignment — should be rejected
+ ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
+ ControllerResult
> addResult =
+ replication.createPartitions(requestContext, List.of(
+ new CreatePartitionsTopic().setName(topic).setCount(2).setAssignments(List.of(
+ new CreatePartitionsAssignment().setBrokerIds(List.of(1))))));
+ assertEquals(INVALID_REPLICA_ASSIGNMENT.code(), addResult.response().get(0).errorCode());
+ assertEquals("A manual partition assignment cannot be specified for diskless topics.",
+ addResult.response().get(0).errorMessage());
+ }
}
@Nested
@@ -5142,6 +5205,113 @@ public void testPeriodicLeaderBalancingSkipsDisklessTopics() {
"Periodic leader balancing should skip diskless topics");
}
+ @Test
+ public void testAddPartitionsAutoPlacement() {
+ MetadataVersion metadataVersion = MetadataVersion.latestTesting();
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setMetadataVersion(metadataVersion)
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+
+ // Create a diskless topic with RF=2, 1 partition
+ String topic = "foo";
+ CreatableTopicResult createResult = ctx.createTestTopic(topic, new int[][] {new int[] {0, 1}},
+ Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0);
+
+ // Add 2 more partitions (auto-placement, no manual assignments)
+ ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
+ ControllerResult
> addResult =
+ replication.createPartitions(requestContext, List.of(
+ new CreatePartitionsTopic().setName(topic).setCount(3).setAssignments(null)));
+ assertEquals(NONE.code(), addResult.response().get(0).errorCode());
+ ctx.replay(addResult.records());
+
+ // Verify new partitions have RF=2 (inherited from existing partitions)
+ for (int p = 0; p < 3; p++) {
+ PartitionRegistration partition = replication.getPartition(createResult.topicId(), p);
+ assertNotNull(partition, "Partition " + p + " should exist");
+ assertEquals(2, partition.replicas.length,
+ "Partition " + p + " should have RF=2");
+ assertTrue(partition.isr.length > 0,
+ "Partition " + p + " should have non-empty ISR");
+ }
+ }
+
+ @Test
+ public void testAddPartitionsManualAssignment() {
+ MetadataVersion metadataVersion = MetadataVersion.latestTesting();
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setMetadataVersion(metadataVersion)
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+
+ // Create a diskless topic with RF=2, 1 partition
+ String topic = "foo";
+ CreatableTopicResult createResult = ctx.createTestTopic(topic, new int[][] {new int[] {0, 1}},
+ Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0);
+
+ // Add 1 partition with manual assignment
+ ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
+ ControllerResult
> addResult =
+ replication.createPartitions(requestContext, List.of(
+ new CreatePartitionsTopic().setName(topic).setCount(2).setAssignments(List.of(
+ new CreatePartitionsAssignment().setBrokerIds(List.of(1, 2))))));
+ assertEquals(NONE.code(), addResult.response().get(0).errorCode());
+ ctx.replay(addResult.records());
+
+ // Verify new partition has the specified replicas
+ PartitionRegistration partition = replication.getPartition(createResult.topicId(), 1);
+ assertEquals(List.of(1, 2), Replicas.toList(partition.replicas));
+ // ISR should include only active brokers (both are active)
+ assertEquals(List.of(1, 2), Replicas.toList(partition.isr));
+ }
+
+ @Test
+ public void testAddPartitionsIsrExcludesFencedBrokers() {
+ MetadataVersion metadataVersion = MetadataVersion.latestTesting();
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
+ .setMetadataVersion(metadataVersion)
+ .setDisklessStorageSystemEnabled(true)
+ .setDisklessManagedReplicasEnabled(true)
+ .build();
+
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2);
+ ctx.unfenceBrokers(0, 1, 2);
+
+ // Create a diskless topic with RF=2, 1 partition
+ String topic = "foo";
+ CreatableTopicResult createResult = ctx.createTestTopic(topic, new int[][] {new int[] {0, 1}},
+ Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0);
+
+ // Fence broker 2
+ ctx.fenceBrokers(2);
+
+ // Add 1 partition with manual assignment including fenced broker
+ ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
+ ControllerResult
> addResult =
+ replication.createPartitions(requestContext, List.of(
+ new CreatePartitionsTopic().setName(topic).setCount(2).setAssignments(List.of(
+ new CreatePartitionsAssignment().setBrokerIds(List.of(1, 2))))));
+ assertEquals(NONE.code(), addResult.response().get(0).errorCode());
+ ctx.replay(addResult.records());
+
+ // Replicas include both, but ISR only includes the active broker
+ PartitionRegistration partition = replication.getPartition(createResult.topicId(), 1);
+ assertEquals(List.of(1, 2), Replicas.toList(partition.replicas));
+ assertEquals(List.of(1), Replicas.toList(partition.isr));
+ }
+
@Test
public void testNoLeaderElectionOnBrokerFenced_noRacks() {
// With RF=1 (no racks), when the single replica is fenced, the leader goes offline
From a8fe232fcb129322373684296e9b2af01f8e4a74 Mon Sep 17 00:00:00 2001
From: Jorge Esteban Quilcate Otoya
> addResult =
+ replication.createPartitions(requestContext, List.of(
+ new CreatePartitionsTopic().setName(topic).setCount(3).setAssignments(null)));
+ assertEquals(NONE.code(), addResult.response().get(0).errorCode());
+ ctx.replay(addResult.records());
+
+ // Verify new partitions are placed on unfenced brokers
+ for (int p = 1; p < 3; p++) {
+ PartitionRegistration partition = replication.getPartition(createResult.topicId(), p);
+ assertNotNull(partition, "Partition " + p + " should exist");
+ assertEquals(1, partition.replicas.length,
+ "Partition " + p + " should have RF=1");
+ assertNotEquals(2, partition.replicas[0],
+ "Partition " + p + " should not be placed on fenced broker");
+ assertTrue(partition.leader >= 0,
+ "Partition " + p + " should have a valid leader");
}
}
From d07f12410868d2bea86206c2085274bd48fc0033 Mon Sep 17 00:00:00 2001
From: Jorge Esteban Quilcate Otoya