Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,9 @@ static Map<String, String> translateCreationConfigs(CreatableTopicConfigCollecti
private final ClassicTopicRemoteStorageForcePolicy classicTopicRemoteStorageForcePolicy;

/**
* When true, diskless topics use managed replicas with RF = rack_count (one replica per rack).
* When true, diskless topics use managed replicas with user-defined RF
* (or {@code default.replication.factor} when RF=-1).
* When false, diskless topics use legacy RF=1 behavior.
*
* <p>Phase 1 limitation: This config only affects topic creation. Add Partitions inherits
* RF from existing partitions (correct behavior - maintains consistency within the topic).
*/
private final boolean isDisklessManagedReplicasEnabled;

Expand Down Expand Up @@ -2253,6 +2251,10 @@ void createPartitions(ControllerRequestContext context,
" additional partition(s), but only " + topic.assignments().size() +
" assignment(s) were specified.");
}
if (isDisklessTopic(topic.name()) && !isDisklessManagedReplicasEnabled) {
throw new InvalidReplicaAssignmentException(
"A manual partition assignment cannot be specified for diskless topics.");
}
Comment thread
jeqo marked this conversation as resolved.
}
try {
context.applyPartitionChangeQuota(additional); // check controller mutation quota
Expand Down Expand Up @@ -2302,11 +2304,19 @@ void createPartitions(ControllerRequestContext context,
).assignments();
isrs = partitionAssignments.stream().map(PartitionAssignment::replicas).toList();
}
// For unmanaged diskless, ISR includes all replicas regardless of fenced state —
// consistent with topic creation. Data lives in object storage, so broker fencing
// doesn't affect data availability.
boolean isDiskless = isDisklessTopic(topic.name());
Predicate<Integer> brokerFilter = (isDiskless && !isDisklessManagedReplicasEnabled)
? x -> true
: clusterControl::isActive;

int partitionId = startPartitionId;
for (int i = 0; i < partitionAssignments.size(); i++) {
PartitionAssignment partitionAssignment = partitionAssignments.get(i);
List<Integer> isr = isrs.get(i).stream().
filter(clusterControl::isActive).toList();
filter(brokerFilter).toList();
// If the ISR is empty, it means that all brokers are fenced or
// in controlled shutdown. To be consistent with the replica placer,
// we reject the create topic request with INVALID_REPLICATION_FACTOR.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4525,6 +4525,113 @@ void testManualReplicaAssignmentsShouldBeRejected() {
INVALID_REQUEST.code()
);
}

@Test
public void testAddPartitionsAutoPlacement() {
Comment thread
jeqo marked this conversation as resolved.
MetadataVersion metadataVersion = MetadataVersion.latestTesting();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setMetadataVersion(metadataVersion)
.setDisklessStorageSystemEnabled(true)
.build();

ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1);
ctx.unfenceBrokers(0, 1);

// Create a diskless topic with RF=1 (unmanaged), 1 partition
String topic = "foo";
CreatableTopicResult createResult = ctx.createTestTopic(
topic, 1, (short) 1,
Map.of(DISKLESS_ENABLE_CONFIG, "true"), NONE.code());

// Add 2 more partitions (auto-placement, no manual assignments)
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
ControllerResult<List<CreatePartitionsTopicResult>> addResult =
replication.createPartitions(requestContext, List.of(
new CreatePartitionsTopic().setName(topic).setCount(3).setAssignments(null)));
assertEquals(NONE.code(), addResult.response().get(0).errorCode());
ctx.replay(addResult.records());

// Verify new partitions have RF=1 (inherited from existing partition)
for (int p = 0; p < 3; p++) {
PartitionRegistration partition = replication.getPartition(createResult.topicId(), p);
assertNotNull(partition, "Partition " + p + " should exist");
assertEquals(1, partition.replicas.length,
"Partition " + p + " should have RF=1");
assertTrue(partition.leader >= 0,
"Partition " + p + " should have a valid leader");
}
}

@Test
public void testAddPartitionsWithFencedBroker() {
MetadataVersion metadataVersion = MetadataVersion.latestTesting();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setMetadataVersion(metadataVersion)
.setDisklessStorageSystemEnabled(true)
.build();

ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);

// Create a diskless topic with RF=1 (unmanaged), 1 partition
String topic = "foo";
CreatableTopicResult createResult = ctx.createTestTopic(
topic, 1, (short) 1,
Map.of(DISKLESS_ENABLE_CONFIG, "true"), NONE.code());

// Fence broker 2
ctx.fenceBrokers(2);

// Add 2 more partitions — should succeed, new partitions placed on unfenced brokers
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
ControllerResult<List<CreatePartitionsTopicResult>> addResult =
replication.createPartitions(requestContext, List.of(
new CreatePartitionsTopic().setName(topic).setCount(3).setAssignments(null)));
assertEquals(NONE.code(), addResult.response().get(0).errorCode());
ctx.replay(addResult.records());

// Verify new partitions are placed on unfenced brokers
for (int p = 1; p < 3; p++) {
PartitionRegistration partition = replication.getPartition(createResult.topicId(), p);
assertNotNull(partition, "Partition " + p + " should exist");
assertEquals(1, partition.replicas.length,
"Partition " + p + " should have RF=1");
assertNotEquals(2, partition.replicas[0],
"Partition " + p + " should not be placed on fenced broker");
assertTrue(partition.leader >= 0,
"Partition " + p + " should have a valid leader");
}
}

@Test
public void testAddPartitionsManualAssignmentRejectedForUnmanaged() {
MetadataVersion metadataVersion = MetadataVersion.latestTesting();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setMetadataVersion(metadataVersion)
.setDisklessStorageSystemEnabled(true)
.build();

ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1);
ctx.unfenceBrokers(0, 1);

// Create a diskless topic with RF=1 (unmanaged), 1 partition
String topic = "foo";
ctx.createTestTopic(topic, 1, (short) 1,
Map.of(DISKLESS_ENABLE_CONFIG, "true"), NONE.code());

// Try to add 1 partition with manual assignment — should be rejected
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
ControllerResult<List<CreatePartitionsTopicResult>> addResult =
replication.createPartitions(requestContext, List.of(
new CreatePartitionsTopic().setName(topic).setCount(2).setAssignments(List.of(
new CreatePartitionsAssignment().setBrokerIds(List.of(1))))));
assertEquals(INVALID_REPLICA_ASSIGNMENT.code(), addResult.response().get(0).errorCode());
assertEquals("A manual partition assignment cannot be specified for diskless topics.",
addResult.response().get(0).errorMessage());
}
}

@Nested
Expand Down Expand Up @@ -5142,6 +5249,113 @@ public void testPeriodicLeaderBalancingSkipsDisklessTopics() {
"Periodic leader balancing should skip diskless topics");
}

@Test
public void testAddPartitionsAutoPlacement() {
MetadataVersion metadataVersion = MetadataVersion.latestTesting();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setMetadataVersion(metadataVersion)
.setDisklessStorageSystemEnabled(true)
.setDisklessManagedReplicasEnabled(true)
.build();

ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);

// Create a diskless topic with RF=2, 1 partition
String topic = "foo";
CreatableTopicResult createResult = ctx.createTestTopic(topic, new int[][] {new int[] {0, 1}},
Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0);

// Add 2 more partitions (auto-placement, no manual assignments)
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
ControllerResult<List<CreatePartitionsTopicResult>> addResult =
replication.createPartitions(requestContext, List.of(
new CreatePartitionsTopic().setName(topic).setCount(3).setAssignments(null)));
assertEquals(NONE.code(), addResult.response().get(0).errorCode());
ctx.replay(addResult.records());

// Verify new partitions have RF=2 (inherited from existing partitions)
for (int p = 0; p < 3; p++) {
PartitionRegistration partition = replication.getPartition(createResult.topicId(), p);
assertNotNull(partition, "Partition " + p + " should exist");
assertEquals(2, partition.replicas.length,
"Partition " + p + " should have RF=2");
assertTrue(partition.isr.length > 0,
"Partition " + p + " should have non-empty ISR");
}
}

@Test
public void testAddPartitionsManualAssignment() {
MetadataVersion metadataVersion = MetadataVersion.latestTesting();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setMetadataVersion(metadataVersion)
.setDisklessStorageSystemEnabled(true)
.setDisklessManagedReplicasEnabled(true)
.build();

ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);

// Create a diskless topic with RF=2, 1 partition
String topic = "foo";
CreatableTopicResult createResult = ctx.createTestTopic(topic, new int[][] {new int[] {0, 1}},
Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0);

// Add 1 partition with manual assignment
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
ControllerResult<List<CreatePartitionsTopicResult>> addResult =
replication.createPartitions(requestContext, List.of(
new CreatePartitionsTopic().setName(topic).setCount(2).setAssignments(List.of(
new CreatePartitionsAssignment().setBrokerIds(List.of(1, 2))))));
assertEquals(NONE.code(), addResult.response().get(0).errorCode());
ctx.replay(addResult.records());

// Verify new partition has the specified replicas
PartitionRegistration partition = replication.getPartition(createResult.topicId(), 1);
assertEquals(List.of(1, 2), Replicas.toList(partition.replicas));
// ISR should include only active brokers (both are active)
assertEquals(List.of(1, 2), Replicas.toList(partition.isr));
}

@Test
public void testAddPartitionsIsrExcludesFencedBrokers() {
MetadataVersion metadataVersion = MetadataVersion.latestTesting();
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder()
.setMetadataVersion(metadataVersion)
.setDisklessStorageSystemEnabled(true)
.setDisklessManagedReplicasEnabled(true)
.build();

ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);

// Create a diskless topic with RF=2, 1 partition
String topic = "foo";
CreatableTopicResult createResult = ctx.createTestTopic(topic, new int[][] {new int[] {0, 1}},
Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0);

// Fence broker 2
ctx.fenceBrokers(2);

// Add 1 partition with manual assignment including fenced broker
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
ControllerResult<List<CreatePartitionsTopicResult>> addResult =
replication.createPartitions(requestContext, List.of(
new CreatePartitionsTopic().setName(topic).setCount(2).setAssignments(List.of(
new CreatePartitionsAssignment().setBrokerIds(List.of(1, 2))))));
assertEquals(NONE.code(), addResult.response().get(0).errorCode());
ctx.replay(addResult.records());

// Replicas include both, but ISR only includes the active broker
PartitionRegistration partition = replication.getPartition(createResult.topicId(), 1);
assertEquals(List.of(1, 2), Replicas.toList(partition.replicas));
assertEquals(List.of(1), Replicas.toList(partition.isr));
}

@Test
public void testNoLeaderElectionOnBrokerFenced_noRacks() {
// With RF=1 (no racks), when the single replica is fenced, the leader goes offline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,13 @@ public class ServerConfigs {
// RF=-1 resolves to default.replication.factor. Explicit RF values (1, 2, 3, ...) are accepted.
// Placement uses standard rack-aware assignment.
// When disabled (default), diskless topics use legacy RF=1 behavior (RF=-1 resolves to 1, RF > 1 rejected).
// This config only affects topic creation.
// This config affects topic creation and add-partitions (manual assignments allowed when enabled).
public static final String DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG = "diskless.managed.rf.enable";
public static final boolean DISKLESS_MANAGED_REPLICAS_ENABLE_DEFAULT = false;
public static final String DISKLESS_MANAGED_REPLICAS_ENABLE_DOC = "When enabled, new diskless topics are created " +
"with user-defined replication factor. RF=-1 resolves to default.replication.factor. " +
"Explicit RF values are accepted. Placement uses standard rack-aware assignment. " +
"When disabled, diskless topics use legacy RF=1 behavior. " +
"This config only affects topic creation.";
"When disabled, diskless topics use legacy RF=1 behavior.";

public static final String CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG = "classic.remote.storage.force.enable";
public static final boolean CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_DEFAULT = false;
Expand Down
Loading