From 02d3f40056e37d395dbd182d3d819d0be67d8d55 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Wed, 21 Jan 2026 16:53:01 +0200 Subject: [PATCH 1/5] test(metadata:diskless): improve test coverage for broker fencing and unregister scenarios - Add _noRacks and _withRacks test variants for consistent coverage - Fix tests that assumed broker 0 was always the leader - Get actual leader from partition registration before fencing/unregistering - Use dynamic assertions based on actual partition state - Improve assertion error messages for clarity --- .../ReplicationControlManagerTest.java | 136 +++++++++++++++--- 1 file changed, 119 insertions(+), 17 deletions(-) diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 72b58a0a53d..aa678568d49 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -4234,9 +4234,9 @@ public void testReassignDisklessPartitions() { } @Test - public void testNoLeaderElectionOnBrokerFenced() { + public void testNoLeaderElectionOnBrokerFenced_noRacks() { // As there are no replicas to elect from, the leader should go offline but no new leader should be elected. - // Currently, diskless topics ignored replica management. It registers a single replica as the leader, but it's not maintained. + // Unmanaged diskless topics register a single replica as the leader, but it's not maintained. ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() .setDisklessStorageSystemEnabled(true) .build(); @@ -4253,25 +4253,29 @@ public void testNoLeaderElectionOnBrokerFenced() { ); final Uuid topicId = createResult.topicId(); + // Get the actual leader before fencing + PartitionRegistration partitionBefore = replication.getPartition(topicId, 0); + int leader = partitionBefore.leader; + List records = new ArrayList<>(); - replication.handleBrokerFenced(0, records); + replication.handleBrokerFenced(leader, records); ctx.replay(records); PartitionRegistration partition = replication.getPartition(topicId, 0); assertNotNull(partition, "Partition should exist after leader fencing"); - assertArrayEquals(new int[]{0}, partition.isr, "ISR should remain unchanged as there was only one leader"); + assertArrayEquals(new int[]{leader}, partition.isr, "ISR should remain unchanged as there is only one replica"); assertEquals(-1, partition.leader, "Leader should be offline after fencing"); } @Test - public void testNoReplicaChangeOnShutdown() { + public void testNoLeaderElectionOnBrokerFenced_withRacks() { // As there are no replicas to elect from, the leader should go offline but no new leader should be elected. - // Currently, diskless topics ignored replica management. It registers a single replica as the leader, but it's not maintained. + // Unmanaged diskless topics register a single replica as the leader, but it's not maintained. ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() .setDisklessStorageSystemEnabled(true) .build(); ReplicationControlManager replication = ctx.replicationControl; - ctx.registerBrokers(0, 1, 2); + ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c"); ctx.unfenceBrokers(0, 1, 2); CreatableTopicResult createResult = ctx.createTestTopic( @@ -4283,16 +4287,88 @@ public void testNoReplicaChangeOnShutdown() { ); final Uuid topicId = createResult.topicId(); + // Get the actual leader before fencing + PartitionRegistration partitionBefore = replication.getPartition(topicId, 0); + int leader = partitionBefore.leader; + List records = new ArrayList<>(); - replication.handleBrokerShutdown(0, true, records); + replication.handleBrokerFenced(leader, records); ctx.replay(records); PartitionRegistration partition = replication.getPartition(topicId, 0); assertNotNull(partition, "Partition should exist after leader fencing"); - assertArrayEquals(new int[]{0}, partition.isr, "ISR should remain unchanged as there was only one leader"); + assertArrayEquals(new int[]{leader}, partition.isr, "ISR should remain unchanged as there is only one replica"); assertEquals(-1, partition.leader, "Leader should be offline after fencing"); } + @Test + public void testNoReplicaChangeOnShutdown_noRacks() { + // As there are no replicas to elect from, the leader should go offline but no new leader should be elected. + // Unmanaged diskless topics register a single replica as the leader, but it's not maintained. + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + 1, + (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + // Get the actual leader before shutdown + PartitionRegistration partitionBefore = replication.getPartition(topicId, 0); + int leader = partitionBefore.leader; + + List records = new ArrayList<>(); + replication.handleBrokerShutdown(leader, true, records); + ctx.replay(records); + + PartitionRegistration partition = replication.getPartition(topicId, 0); + assertNotNull(partition, "Partition should exist after leader shutdown"); + assertArrayEquals(new int[]{leader}, partition.isr, "ISR should remain unchanged as there is only one replica"); + assertEquals(-1, partition.leader, "Leader should be offline after shutdown"); + } + + @Test + public void testNoReplicaChangeOnShutdown_withRacks() { + // As there are no replicas to elect from, the leader should go offline but no new leader should be elected. + // Unmanaged diskless topics register a single replica as the leader, but it's not maintained. + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c"); + ctx.unfenceBrokers(0, 1, 2); + + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + 1, + (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + // Get the actual leader before shutdown + PartitionRegistration partitionBefore = replication.getPartition(topicId, 0); + int leader = partitionBefore.leader; + + List records = new ArrayList<>(); + replication.handleBrokerShutdown(leader, true, records); + ctx.replay(records); + + PartitionRegistration partition = replication.getPartition(topicId, 0); + assertNotNull(partition, "Partition should exist after leader shutdown"); + assertArrayEquals(new int[]{leader}, partition.isr, "ISR should remain unchanged as there is only one replica"); + assertEquals(-1, partition.leader, "Leader should be offline after shutdown"); + } + @Test void testDisklessMarksLeaderOfflineOnUnregister_noRacks() { ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() @@ -4312,18 +4388,31 @@ void testDisklessMarksLeaderOfflineOnUnregister_noRacks() { ); final Uuid topicId = createResult.topicId(); + // Identify partitions that have broker 0 as leader before unregistering + Set partitionsWithBroker0AsLeader = new HashSet<>(); + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + PartitionRegistration partition = replication.getPartition(topicId, partitionId); + if (partition.leader == 0) { + partitionsWithBroker0AsLeader.add(partitionId); + } + } + List records = new ArrayList<>(); replication.handleBrokerUnregistered(0, 100, records); ctx.replay(records); // All partitions should remain present and keep the original replica/ISR, - // only the leader should be marked offline. + // only leaders on broker 0 should be marked offline. for (int partitionId = 0; partitionId < numPartitions; partitionId++) { PartitionRegistration partition = replication.getPartition(topicId, partitionId); assertNotNull(partition, "Partition " + partitionId + " should exist after broker unregistration"); - assertArrayEquals(new int[]{0}, partition.replicas, "Replicas should stay unchanged for partition " + partitionId); - assertArrayEquals(new int[]{0}, partition.isr, "ISR should stay unchanged for partition " + partitionId); - assertEquals(-1, partition.leader, "Leader should be offline for partition " + partitionId); + assertEquals(1, partition.replicas.length, "Replicas should have 1 element for partition " + partitionId); + assertEquals(1, partition.isr.length, "ISR should have 1 element for partition " + partitionId); + if (partitionsWithBroker0AsLeader.contains(partitionId)) { + assertEquals(-1, partition.leader, "Leader should be offline for partition " + partitionId + " (was on broker 0)"); + } else { + assertTrue(partition.leader >= 0, "Leader should remain online for partition " + partitionId + " (was not on broker 0)"); + } } } @@ -4346,18 +4435,31 @@ void testDisklessMarksLeaderOfflineOnUnregister_withRacks() { ); final Uuid topicId = createResult.topicId(); + // Identify partitions that have broker 0 as leader before unregistering + Set partitionsWithBroker0AsLeader = new HashSet<>(); + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + PartitionRegistration partition = replication.getPartition(topicId, partitionId); + if (partition.leader == 0) { + partitionsWithBroker0AsLeader.add(partitionId); + } + } + List records = new ArrayList<>(); replication.handleBrokerUnregistered(0, 100, records); ctx.replay(records); // All partitions should remain present and keep the original replica/ISR, - // only the leader should be marked offline. + // only leaders on broker 0 should be marked offline. for (int partitionId = 0; partitionId < numPartitions; partitionId++) { PartitionRegistration partition = replication.getPartition(topicId, partitionId); assertNotNull(partition, "Partition " + partitionId + " should exist after broker unregistration"); - assertArrayEquals(new int[]{0}, partition.replicas, "Replicas should stay unchanged for partition " + partitionId); - assertArrayEquals(new int[]{0}, partition.isr, "ISR should stay unchanged for partition " + partitionId); - assertEquals(-1, partition.leader, "Leader should be offline for partition " + partitionId); + assertEquals(1, partition.replicas.length, "Replicas should have 1 element for partition " + partitionId); + assertEquals(1, partition.isr.length, "ISR should have 1 element for partition " + partitionId); + if (partitionsWithBroker0AsLeader.contains(partitionId)) { + assertEquals(-1, partition.leader, "Leader should be offline for partition " + partitionId + " (was on broker 0)"); + } else { + assertTrue(partition.leader >= 0, "Leader should remain online for partition " + partitionId + " (was not on broker 0)"); + } } } From 15fe52678acd7168fdcd4d9539844f3b03e33150 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Tue, 20 Jan 2026 13:29:25 +0200 Subject: [PATCH 2/5] feat(controller:diskless): add server config for managed replicas Add diskless.managed.rf.enable config (default: false) to control whether diskless topics use managed replicas with RF=rack_count or legacy RF=1. This config only affects topic creation. When enabled, new diskless topics will be created with one replica per rack using standard KRaft placement. Part of Phase 1: Diskless Managed Replicas (See #478 docs/inkless/ts-unification/DISKLESS_MANAGED_RF.md) # Conflicts: # core/src/main/scala/kafka/server/ControllerServer.scala # core/src/main/scala/kafka/server/KafkaConfig.scala # metadata/src/main/java/org/apache/kafka/controller/QuorumController.java # server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java --- .../main/scala/kafka/server/ControllerServer.scala | 1 + core/src/main/scala/kafka/server/KafkaConfig.scala | 1 + .../apache/kafka/controller/QuorumController.java | 8 ++++++++ .../apache/kafka/server/config/ServerConfigs.java | 14 ++++++++++++++ 4 files changed, 24 insertions(+) diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index b0755bd361a..db894c98ec6 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -240,6 +240,7 @@ class ControllerServer( setDefaultNumPartitions(config.numPartitions.intValue()). setDefaultDisklessEnable(config.logDisklessEnable). setDisklessStorageSystemEnabled(config.disklessStorageSystemEnabled). + setDisklessManagedReplicasEnabled(config.disklessManagedReplicasEnabled). setClassicRemoteStorageForceEnabled(config.classicRemoteStorageForceEnabled). setClassicRemoteStorageForceExcludeTopicRegexes(config.classicRemoteStorageForceExcludeTopicRegexes). setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(), diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 29b3c2c1c98..1ce736bb363 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -425,6 +425,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) /** Diskless Configuration */ val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG) val disklessAllowFromClassicEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG) + val disklessManagedReplicasEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG) val classicRemoteStorageForceEnabled: Boolean = getBoolean(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG) val classicRemoteStorageForceExcludeTopicRegexes: java.util.List[String] = getList(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_EXCLUDE_TOPIC_REGEXES_CONFIG) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index f8110170a0e..46f26092087 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -223,6 +223,7 @@ public static class Builder { private boolean defaultDisklessEnable = false; private boolean disklessStorageSystemEnabled = false; + private boolean disklessManagedReplicasEnabled = false; private boolean classicRemoteStorageForceEnabled = false; private List classicRemoteStorageForceExcludeTopicRegexes = List.of(); @@ -295,6 +296,11 @@ public Builder setDisklessStorageSystemEnabled(boolean disklessStorageSystemEnab return this; } + public Builder setDisklessManagedReplicasEnabled(boolean disklessManagedReplicasEnabled) { + this.disklessManagedReplicasEnabled = disklessManagedReplicasEnabled; + return this; + } + public Builder setClassicRemoteStorageForceEnabled(boolean classicRemoteStorageForceEnabled) { this.classicRemoteStorageForceEnabled = classicRemoteStorageForceEnabled; return this; @@ -454,6 +460,7 @@ public QuorumController build() throws Exception { defaultNumPartitions, defaultDisklessEnable, disklessStorageSystemEnabled, + disklessManagedReplicasEnabled, classicRemoteStorageForceEnabled, classicRemoteStorageForceExcludeTopicRegexes, replicaPlacer, @@ -1497,6 +1504,7 @@ private QuorumController( int defaultNumPartitions, boolean defaultDisklessEnable, boolean disklessStorageSystemEnabled, + boolean disklessManagedReplicasEnabled, boolean classicRemoteStorageForceEnabled, List classicRemoteStorageForceExcludeTopicRegexes, ReplicaPlacer replicaPlacer, diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index a875271f979..6970984e5c6 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -142,6 +142,19 @@ public class ServerConfigs { "This should only be enabled in non-production environments for testing or migration purposes. " + "When enabled, topics can have their diskless.enable config changed from false to true."; + // When enabled, diskless topics are created with RF = rack_count (one replica per rack). + // If brokers are registered but none have a rack configured, RF defaults to 1. + // If no brokers are registered, topic creation fails with BROKER_NOT_AVAILABLE. + // When disabled (default), diskless topics use legacy RF=1 behavior. + // This config only affects topic creation. + public static final String DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG = "diskless.managed.rf.enable"; + public static final boolean DISKLESS_MANAGED_REPLICAS_ENABLE_DEFAULT = false; + public static final String DISKLESS_MANAGED_REPLICAS_ENABLE_DOC = "When enabled, new diskless topics are created " + + "with replication factor equal to the number of racks (one replica per rack). " + + "If brokers are registered but none have a rack configured, RF defaults to 1. " + + "When disabled, diskless topics use legacy RF=1 behavior. " + + "This config only affects topic creation."; + public static final String CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG = "classic.remote.storage.force.enable"; public static final boolean CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_DEFAULT = false; public static final String CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_DOC = "Force classic topics to be created with remote.storage.enable=true, " + @@ -202,6 +215,7 @@ public class ServerConfigs { DISKLESS_STORAGE_SYSTEM_ENABLE_DOC) .define(DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, BOOLEAN, DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DEFAULT, LOW, DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DOC) + .define(DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG, BOOLEAN, DISKLESS_MANAGED_REPLICAS_ENABLE_DEFAULT, MEDIUM, DISKLESS_MANAGED_REPLICAS_ENABLE_DOC) .define(CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG, BOOLEAN, CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_DEFAULT, LOW, CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_DOC) .define(CLASSIC_REMOTE_STORAGE_FORCE_EXCLUDE_TOPIC_REGEXES_CONFIG, LIST, CLASSIC_REMOTE_STORAGE_FORCE_EXCLUDE_TOPIC_REGEXES_DEFAULT, From 9a44b389dc284aba826d15736fa8019594eff963 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Wed, 21 Jan 2026 16:55:04 +0200 Subject: [PATCH 3/5] feat(metadata:diskless): implement managed replicas for diskless topics When diskless.managed.rf.enable=true, new diskless topics are created with RF=rack_count using standard KRaft replica placement instead of legacy RF=1. Changes: - Compute RF from rack cardinality via rackCardinality() - Use standard replicaPlacer.place() for rack-aware assignment - Allow manual replica assignments when managed replicas enabled - Add checkstyle suppression for extended createTopic method Phase 1 limitations: - Add Partitions inherits RF from existing partitions (Phase 3) - Transformer not updated, uses legacy routing (Phase 2) - Integration tests deferred to Phase 2 (See #478 docs/inkless/ts-unification/DISKLESS_MANAGED_RF.md)  Conflicts:  metadata/src/main/java/org/apache/kafka/controller/QuorumController.java  metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java  metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java --- checkstyle/suppressions.xml | 6 + .../kafka/controller/QuorumController.java | 1 + .../controller/ReplicationControlManager.java | 69 +- .../ReplicationControlManagerTest.java | 778 +++++++++++++++++- 4 files changed, 837 insertions(+), 17 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index e73d2d0e9b2..0238cabda84 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -86,6 +86,9 @@ files="MemoryRecordsBuilder.java"/> + + @@ -318,6 +321,9 @@ files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager|KRaftMigrationDriver|ClusterControlManager|MetaPropertiesEnsemble).java"/> + + classicRemoteStorageForceExcludeTopicRegexes = List.of(); @@ -201,6 +203,11 @@ public Builder setDisklessStorageSystemEnabled(boolean isDisklessStorageSystemEn return this; } + public Builder setDisklessManagedReplicasEnabled(boolean isDisklessManagedReplicasEnabled) { + this.isDisklessManagedReplicasEnabled = isDisklessManagedReplicasEnabled; + return this; + } + public Builder setClassicRemoteStorageForceEnabled(boolean classicRemoteStorageForceEnabled) { this.classicRemoteStorageForceEnabled = classicRemoteStorageForceEnabled; return this; @@ -253,6 +260,7 @@ ReplicationControlManager build() { defaultNumPartitions, defaultDisklessEnable, isDisklessStorageSystemEnabled, + isDisklessManagedReplicasEnabled, classicRemoteStorageForceEnabled, classicRemoteStorageForceExcludeTopicRegexes, maxElectionsPerImbalance, @@ -332,9 +340,21 @@ static Map translateCreationConfigs(CreatableTopicConfigCollecti */ private final boolean defaultDisklessEnable; + /** + * When true, the diskless storage system is enabled, allowing diskless topics to be created. + */ private final boolean isDisklessStorageSystemEnabled; private final ClassicTopicRemoteStorageForcePolicy classicTopicRemoteStorageForcePolicy; + /** + * When true, diskless topics use managed replicas with RF = rack_count (one replica per rack). + * When false, diskless topics use legacy RF=1 behavior. + * + *

Phase 1 limitation: This config only affects topic creation. Add Partitions inherits + * RF from existing partitions (correct behavior - maintains consistency within the topic). + */ + private final boolean isDisklessManagedReplicasEnabled; + /** * Maximum number of leader elections to perform during one partition leader balancing operation. */ @@ -425,6 +445,7 @@ private ReplicationControlManager( int defaultNumPartitions, boolean defaultDisklessEnable, boolean isDisklessStorageSystemEnabled, + boolean isDisklessManagedReplicasEnabled, boolean classicRemoteStorageForceEnabled, List classicRemoteStorageForceExcludeTopicRegexes, int maxElectionsPerImbalance, @@ -439,6 +460,7 @@ private ReplicationControlManager( this.defaultNumPartitions = defaultNumPartitions; this.defaultDisklessEnable = defaultDisklessEnable; this.isDisklessStorageSystemEnabled = isDisklessStorageSystemEnabled; + this.isDisklessManagedReplicasEnabled = isDisklessManagedReplicasEnabled; this.classicTopicRemoteStorageForcePolicy = new ClassicTopicRemoteStorageForcePolicy( classicRemoteStorageForceEnabled, classicRemoteStorageForceExcludeTopicRegexes @@ -803,9 +825,13 @@ private ApiError createTopic(ControllerRequestContext context, "when the diskless storage system is disabled. " + "Please enable the diskless storage system to create diskless topics."); } + // Diskless RF validation: accept -1 (auto) or 1 (backward compat) only. + // Explicit RF > 1 rejected: users shouldn't need to know rack topology. + // Note: RF=1 is accepted for API backward compatibility, but when managed replicas + // are enabled, the actual RF is always computed from rack topology (rackCardinality). if (Math.abs(topic.replicationFactor()) != 1) { return new ApiError(Errors.INVALID_REPLICATION_FACTOR, - "Replication factor for diskless topics must be 1 or -1 to use the default value (1)."); + "Replication factor for diskless topics must be 1 or -1 (system-computed from rack topology)."); } } @@ -820,7 +846,7 @@ private ApiError createTopic(ControllerRequestContext context, "A manual partition assignment was specified, but numPartitions " + "was not set to -1."); } - if (disklessEnabled) { + if (disklessEnabled && !isDisklessManagedReplicasEnabled) { return new ApiError(INVALID_REQUEST, "A manual partition assignment cannot be specified for diskless topics."); } @@ -868,12 +894,19 @@ private ApiError createTopic(ControllerRequestContext context, } else { int numPartitions = topic.numPartitions() == -1 ? defaultNumPartitions : topic.numPartitions(); - short replicationFactor = topic.replicationFactor() == -1 ? - defaultReplicationFactor : topic.replicationFactor(); + short classicReplicationFactor = topic.replicationFactor() == -1 ? defaultReplicationFactor : topic.replicationFactor(); + // For managed diskless: always use rackCardinality() regardless of requested RF. + // RF=1 in the request is accepted for backward compat but overridden here. + // Throws BrokerNotAvailableException or InvalidReplicationFactorException on failure, + // which are caught by the caller and converted to ApiError. + short disklessReplicationFactor = disklessEnabled && isDisklessManagedReplicasEnabled ? rackCardinality() : 1; + short replicationFactor = disklessEnabled ? disklessReplicationFactor : classicReplicationFactor; try { TopicAssignment topicAssignment; Predicate brokerFilter; - if (!disklessEnabled) { + // Diskless managed-replicas is equivalent to classic topic assignment, + // but RF is defined by number of racks + if (!disklessEnabled || isDisklessManagedReplicasEnabled) { topicAssignment = clusterControl.replicaPlacer().place(new PlacementSpec( 0, numPartitions, @@ -943,6 +976,32 @@ private ApiError createTopic(ControllerRequestContext context, return ApiError.NONE; } + /** + * Computes the replication factor for diskless topics based on rack topology. + * Returns the number of distinct racks in the cluster, ensuring one replica per rack. + * Brokers with no rack configured are all treated as belonging to a single logical rack, + * so if at least one broker is registered but none have a rack configured, the result is RF=1. + * + * @return the number of distinct racks as a short + * @throws BrokerNotAvailableException if no brokers are registered + * @throws InvalidReplicationFactorException if rack count exceeds Short.MAX_VALUE + */ + private short rackCardinality() { + final Collection brokerRegistrations = clusterControl.brokerRegistrations().values(); + final long racks = brokerRegistrations.stream() + .map(BrokerRegistration::rack) + .distinct() + .count(); + if (racks > Short.MAX_VALUE) { + // Unfeasible but technically possible scenario. + // Would require more than 32,768 brokers and each with a different rack + throw new InvalidReplicationFactorException("Unexpected scenario: rack cardinality is not within short range (" + racks + "). Failing topic creation."); + } + if (racks == 0) + throw new BrokerNotAvailableException("No brokers available to create diskless topic."); + return (short) racks; + } + private boolean disklessEnabledOnTopicCreation(final Map creationConfigs) { final String disklessEnableConfigValue = creationConfigs.get(DISKLESS_ENABLE_CONFIG); final boolean disklessConfigEnabled; diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index aa678568d49..eb1a0f382b9 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -185,6 +185,7 @@ private static class Builder { private final Map staticConfig = new HashMap<>(); private boolean defaultDisklessEnable = false; private boolean disklessStorageSystemEnable = false; + private boolean disklessManagedReplicasEnable = false; private boolean classicRemoteStorageForceEnabled = false; private List classicRemoteStorageForceExcludeTopicRegexes = List.of(); @@ -223,6 +224,11 @@ Builder setDisklessStorageSystemEnabled(boolean disklessStorageSystemEnable) { return this; } + Builder setDisklessManagedReplicasEnabled(boolean disklessManagedReplicasEnable) { + this.disklessManagedReplicasEnable = disklessManagedReplicasEnable; + return this; + } + Builder setClassicRemoteStorageForceEnabled(final boolean classicRemoteStorageForceEnabled) { this.classicRemoteStorageForceEnabled = classicRemoteStorageForceEnabled; return this; @@ -241,6 +247,7 @@ ReplicationControlTestContext build() { staticConfig, defaultDisklessEnable, disklessStorageSystemEnable, + disklessManagedReplicasEnable, classicRemoteStorageForceEnabled, classicRemoteStorageForceExcludeTopicRegexes); } @@ -270,6 +277,7 @@ private ReplicationControlTestContext( Map staticConfig, boolean defaultDisklessEnable, boolean disklessStorageSystemEnable, + boolean disklessManagedReplicasEnable, final boolean classicRemoteStorageForceEnabled, final List classicRemoteStorageForceExcludeTopicRegexes ) { @@ -317,6 +325,7 @@ private ReplicationControlTestContext( setFeatureControl(featureControl). setDefaultDisklessEnable(defaultDisklessEnable). setDisklessStorageSystemEnabled(disklessStorageSystemEnable). + setDisklessManagedReplicasEnabled(disklessManagedReplicasEnable). setClassicRemoteStorageForceEnabled(classicRemoteStorageForceEnabled). setClassicRemoteStorageForceExcludeTopicRegexes(classicRemoteStorageForceExcludeTopicRegexes). build(); @@ -3786,11 +3795,11 @@ public void testNotCreateDisklessTopic(boolean logDisklessEnableServerConfig, St } @ParameterizedTest - @CsvSource({ + @CsvSource(value = { "true,true", - "true,", + "true,NULL", "false,true", - }) + }, nullValues = "NULL") public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() .setDefaultDisklessEnable(logDisklessEnableServerConfig) @@ -3894,11 +3903,11 @@ public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfi } @ParameterizedTest - @CsvSource({ + @CsvSource(value = { "true,true", - "true,", + "true,NULL", "false,true", - }) + }, nullValues = "NULL") public void testCreateDisklessTopic_withRacks(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() .setDefaultDisklessEnable(logDisklessEnableServerConfig) @@ -4039,13 +4048,13 @@ public void testCreateDisklessTopicWithInvalidInput(int numPartitions, short rep } @ParameterizedTest - @CsvSource({ + @CsvSource(value = { "true,false", - "true," + "true,NULL" // This case is not valid because no internal topic should be explicitly created with diskless enabled. // Tested in testInvalidDisklessTopicCreationForInternalTopics // "false,true", - }) + }, nullValues = "NULL") public void testCreateInternalTopicWithDisklessEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { // Given a setup with diskless defined at the server level ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() @@ -4129,10 +4138,10 @@ public void testInvalidDisklessTopicCreationForInternalTopics() { } @ParameterizedTest - @CsvSource({ + @CsvSource(value = { "false,true", - "true," - }) + "true,NULL" + }, nullValues = "NULL") public void testInvalidDisklessTopicCreationWithoutSystemEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { // Given a setup with diskless defined at the server level ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() @@ -4481,4 +4490,749 @@ void testManualReplicaAssignmentsShouldBeRejected() { } } + @Nested + // Tests Diskless managed-replicas + class DisklessManagedReplicasTests { + @ParameterizedTest + @CsvSource(value = { + "false,false", + "false,NULL", + "true,false", + }, nullValues = "NULL") + public void testCreatesClassicTopicWhenDisklessDisabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDefaultDisklessEnable(logDisklessEnableServerConfig) + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given a request to create a kafka topic with diskless disabled + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + if (disklessEnableTopicConfig != null) { + creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue(disklessEnableTopicConfig)); + } + + request.topics().add(new CreatableTopic().setName("foo"). + setNumPartitions(-1).setReplicationFactor((short) -1) + .setConfigs(creatableTopicConfigs)); + + // Given all brokers unfenced + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // When creating a topic with diskless disabled + ControllerResult result = + replicationControl.createTopics(requestContext, request, Collections.singleton("foo")); + // Then the topic creation should succeed as a classic topic with RF=3 + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). + setNumPartitions(1).setReplicationFactor((short) 3). + setErrorMessage(null).setErrorCode((short) 0). + setTopicId(result.response().topics().find("foo").topicId())); + assertEquals(expectedResponse, withoutConfigs(result.response())); + final List disklessConfigRecords = result.records().stream() + .filter(m -> m.message() instanceof ConfigRecord) + .map(m -> (ConfigRecord) m.message()) + .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) + .toList(); + if (!disklessConfigRecords.isEmpty()) { + // Then always diskless is disabled + assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("false"))); + } + + // Given the topic is registered + ctx.replay(result.records()); + assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 0}). + setDirectories(new Uuid[] { + Uuid.fromString("TESTBROKER00001DIRAAAA"), + Uuid.fromString("TESTBROKER00002DIRAAAA"), + Uuid.fromString("TESTBROKER00000DIRAAAA") + }). + setIsr(new int[] {1, 2, 0}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build(), + replicationControl.getPartition( + ((TopicRecord) result.records().get(0).message()).topicId(), 0)); + + // When creating a topic with diskless enabled and already exists + ControllerResult result1 = + replicationControl.createTopics(requestContext, request, Collections.singleton("foo")); + CreateTopicsResponseData expectedResponse1 = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse1.topics().add(new CreatableTopicResult().setName("foo"). + setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()). + setErrorMessage("Topic 'foo' already exists.")); + assertEquals(expectedResponse1, result1.response()); + } + + @ParameterizedTest + @CsvSource(value = { + "true,true", + "true,NULL", + "false,true", + }, nullValues = "NULL") + public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDefaultDisklessEnable(logDisklessEnableServerConfig) + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given a request to create a kafka topic with diskless enabled + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + if (disklessEnableTopicConfig != null) { + creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue(disklessEnableTopicConfig)); + } + request.topics().add(new CreatableTopic().setName("foo"). + setNumPartitions(-1).setReplicationFactor((short) -1) + .setConfigs(creatableTopicConfigs)); + + // When creating a topic without brokers available + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ControllerResult result = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should fail with BROKER_NOT_AVAILABLE error + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). + setErrorCode(Errors.BROKER_NOT_AVAILABLE.code()). + setErrorMessage("No brokers available to create diskless topic.")); + assertEquals(expectedResponse, withoutConfigs(result.response())); + + // Given brokers are registered + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0); + + // When creating a topic with diskless enabled + ControllerResult result2 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should succeed, regardless of fenced brokers + CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData(); + expectedResponse2.topics().add(new CreatableTopicResult().setName("foo"). + setNumPartitions(1).setReplicationFactor((short) 1). + setErrorMessage(null).setErrorCode((short) 0). + setTopicId(result2.response().topics().find("foo").topicId())); + CreateTopicsResponseData response = result2.response(); + assertEquals(expectedResponse2, withoutConfigs(response)); + + // Given all brokers unfenced + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // When creating a topic with diskless enabled + ControllerResult result3 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should succeed, regardless of the RF + CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData(); + expectedResponse3.topics().add(new CreatableTopicResult().setName("foo"). + setNumPartitions(1).setReplicationFactor((short) 1). + setErrorMessage(null).setErrorCode((short) 0). + setTopicId(result3.response().topics().find("foo").topicId())); + assertEquals(expectedResponse3, withoutConfigs(result3.response())); + final List disklessConfigRecords = result3.records().stream() + .filter(m -> m.message() instanceof ConfigRecord) + .map(m -> (ConfigRecord) m.message()) + .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) + .toList(); + assertEquals(1, disklessConfigRecords.size()); + // Then diskless is always enabled + assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("true"))); + + // Given the topic is registered + ctx.replay(result3.records()); + assertEquals( + new PartitionRegistration.Builder().setReplicas(new int[] {1}). + setDirectories(new Uuid[] { + Uuid.fromString("TESTBROKER00001DIRAAAA"), + }). + setIsr(new int[] {1}) + .setLeader(1) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setPartitionEpoch(0) + .build(), + replicationControl.getPartition(((TopicRecord) result3.records().get(0).message()).topicId(), 0)); + + // When creating a topic with diskless enabled and already exists + ControllerResult result4 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse4.topics().add(new CreatableTopicResult().setName("foo"). + setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()). + setErrorMessage("Topic 'foo' already exists.")); + assertEquals(expectedResponse4, result4.response()); + } + + @ParameterizedTest + @CsvSource(value = { + "true,true", + "true,NULL", + "false,true", + }, nullValues = "NULL") + public void testCreateDisklessTopic_withRacks(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDefaultDisklessEnable(logDisklessEnableServerConfig) + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given a request to create a kafka topic with diskless enabled + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + if (disklessEnableTopicConfig != null) { + creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue(disklessEnableTopicConfig)); + } + request.topics().add(new CreatableTopic().setName("foo"). + setNumPartitions(-1).setReplicationFactor((short) -1) + .setConfigs(creatableTopicConfigs)); + + // When creating a topic without brokers available + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ControllerResult result = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should fail with BROKER_NOT_AVAILABLE error + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). + setErrorCode(Errors.BROKER_NOT_AVAILABLE.code()). + setErrorMessage("No brokers available to create diskless topic.")); + assertEquals(expectedResponse, withoutConfigs(result.response())); + + // Given brokers are registered + ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c"); + ctx.unfenceBrokers(0); + + // When creating a topic with diskless enabled + ControllerResult result2 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should succeed, regardless of fenced brokers + CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData(); + expectedResponse2.topics().add(new CreatableTopicResult().setName("foo"). + setNumPartitions(1).setReplicationFactor((short) 3). + setErrorMessage(null).setErrorCode((short) 0). + setTopicId(result2.response().topics().find("foo").topicId())); + CreateTopicsResponseData response = result2.response(); + assertEquals(expectedResponse2, withoutConfigs(response)); + + // Given all brokers unfenced + ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c"); + ctx.unfenceBrokers(0, 1, 2); + + // When creating a topic with diskless enabled + ControllerResult result3 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + // Then the topic creation should succeed, regardless of the RF + CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData(); + expectedResponse3.topics().add(new CreatableTopicResult().setName("foo"). + setNumPartitions(1).setReplicationFactor((short) 3). + setErrorMessage(null).setErrorCode((short) 0). + setTopicId(result3.response().topics().find("foo").topicId())); + assertEquals(expectedResponse3, withoutConfigs(result3.response())); + final List disklessConfigRecords = result3.records().stream() + .filter(m -> m.message() instanceof ConfigRecord) + .map(m -> (ConfigRecord) m.message()) + .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) + .toList(); + assertEquals(1, disklessConfigRecords.size()); + // Then diskless is always enabled + assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("true"))); + + // Given the topic is registered + ctx.replay(result3.records()); + assertEquals( + new PartitionRegistration.Builder().setReplicas(new int[] {0, 1, 2}). + setDirectories(new Uuid[] { + Uuid.fromString("TESTBROKER00000DIRAAAA"), + Uuid.fromString("TESTBROKER00001DIRAAAA"), + Uuid.fromString("TESTBROKER00002DIRAAAA"), + }). + setIsr(new int[] {0, 1, 2}) + .setLeader(0) + .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) + .setLeaderEpoch(0) + .setPartitionEpoch(0) + .build(), + replicationControl.getPartition(((TopicRecord) result3.records().get(0).message()).topicId(), 0)); + + // When creating a topic with diskless enabled and already exists + ControllerResult result4 = + replicationControl.createTopics(requestContext, request, Set.of("foo")); + CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse4.topics().add(new CreatableTopicResult().setName("foo"). + setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()). + setErrorMessage("Topic 'foo' already exists.")); + assertEquals(expectedResponse4, result4.response()); + } + + @ParameterizedTest + @CsvSource({ + "1, -2, INVALID_REPLICATION_FACTOR", + "1, 0, INVALID_REPLICATION_FACTOR", + "1, 2, INVALID_REPLICATION_FACTOR", + "-2, 1, INVALID_PARTITIONS", + "0, 1, INVALID_PARTITIONS", + }) + public void testCreateDisklessTopicWithInvalidInput(int numPartitions, short replicationFactor, String expectedError) { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + + CreateTopicsRequestData.CreatableTopicConfigCollection disklessConfig = + new CreateTopicsRequestData.CreatableTopicConfigCollection(); + disklessConfig.add( + new CreateTopicsRequestData.CreatableTopicConfig() + .setName("diskless.enable") + .setValue("true") + ); + + CreateTopicsRequestData request1 = new CreateTopicsRequestData(); + request1.topics().add(new CreatableTopic().setName("baz") + .setNumPartitions(numPartitions).setReplicationFactor(replicationFactor) + .setConfigs(disklessConfig)); + + ControllerResult result1 = + replicationControl.createTopics(requestContext, request1, Set.of("baz")); + assertEquals(Errors.valueOf(expectedError).code(), result1.response().topics().find("baz").errorCode()); + assertEquals(List.of(), result1.records()); + } + + @ParameterizedTest + @CsvSource(value = { + "true,false", + "true,NULL" + // This case is not valid because no internal topic should be explicitly created with diskless enabled. + // Tested in testInvalidDisklessTopicCreationForInternalTopics + // "false,true", + }, nullValues = "NULL") + public void testCreateInternalTopicWithDisklessEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { + // Given a setup with diskless defined at the server level + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDefaultDisklessEnable(logDisklessEnableServerConfig) + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given an internal kafka topic with diskless enabled + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection creatableTopicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + if (disklessEnableTopicConfig != null) { + // If the diskless enable config is set, it should be added to the topic configs + creatableTopicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue(disklessEnableTopicConfig)); + } + final String internalTopic = Topic.GROUP_METADATA_TOPIC_NAME; + request.topics().add(new CreatableTopic().setName(internalTopic). + setNumPartitions(-1).setReplicationFactor((short) -1) + .setConfigs(creatableTopicConfigs)); + // Given all brokers unfenced + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + // When creating an internal topic with diskless enabled, disable it + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ControllerResult result = + replicationControl.createTopics(requestContext, request, Set.of(internalTopic)); + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + // Then the topic creation should succeed with diskless disabled for internal topics + expectedResponse.topics().add( + new CreatableTopicResult() + .setName(internalTopic) + .setNumPartitions(1) + .setReplicationFactor((short) 3) + .setErrorMessage(null).setErrorCode((short) 0) + .setTopicId(result.response().topics().find(internalTopic).topicId())); + assertEquals(expectedResponse, withoutConfigs(result.response())); + assertTrue(result.response().topics().find(internalTopic) + .configs() + .stream() + .noneMatch(c -> c.name().equals(DISKLESS_ENABLE_CONFIG))); + final List disklessConfigRecords = result.records().stream() + .filter(m -> m.message() instanceof ConfigRecord) + .map(m -> (ConfigRecord) m.message()) + .filter(c -> c.name().equals(DISKLESS_ENABLE_CONFIG)) + .toList(); + // Then always diskless is disabled + assertTrue(disklessConfigRecords.stream().allMatch(c -> c.value().equals("false"))); + } + + @Test + public void testInvalidDisklessTopicCreationForInternalTopics() { + // Given a setup with diskless defined at the server level + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given an internal kafka topic with diskless enabled + final String internalTopic = Topic.GROUP_METADATA_TOPIC_NAME; + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection topicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + topicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue("true")); + request.topics().add(new CreatableTopic().setName(internalTopic).setConfigs(topicConfigs)); + // Given all brokers unfenced + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + // When creating an internal topic with diskless enabled, disable it + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ControllerResult result = + replicationControl.createTopics(requestContext, request, Set.of(internalTopic)); + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse.topics().add( + new CreatableTopicResult() + .setName(internalTopic) + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage("Internal topics cannot be diskless topics.")); + assertEquals(expectedResponse, withoutConfigs(result.response())); + } + + @ParameterizedTest + @CsvSource(value = { + "false,true", + "true,NULL" + }, nullValues = "NULL") + public void testInvalidDisklessTopicCreationWithoutSystemEnabled(boolean logDisklessEnableServerConfig, String disklessEnableTopicConfig) { + // Given a setup with diskless defined at the server level + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(false) + .setDefaultDisklessEnable(logDisklessEnableServerConfig) + .setDisklessManagedReplicasEnabled(true) + .build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + // Given an internal kafka topic with diskless enabled + CreateTopicsRequestData request = new CreateTopicsRequestData(); + CreateTopicsRequestData.CreatableTopicConfigCollection topicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection(); + if (disklessEnableTopicConfig != null) { + // If the diskless enable config is set, it should be added to the topic configs + topicConfigs.add(new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue(disklessEnableTopicConfig)); + } + final String topicName = "foo"; + request.topics().add(new CreatableTopic().setName(topicName).setConfigs(topicConfigs)); + // Given all brokers unfenced + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + // When creating an internal topic with diskless enabled, disable it + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + ControllerResult result = + replicationControl.createTopics(requestContext, request, Set.of(topicName)); + CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); + // Then the topic creation should fail with TOPIC_ALREADY_EXISTS error + expectedResponse.topics().add( + new CreatableTopicResult() + .setName(topicName) + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage("Cannot create diskless topics when the diskless storage system is disabled. Please enable the diskless storage system to create diskless topics.")); + assertEquals(expectedResponse, withoutConfigs(result.response())); + } + + @Test + public void testReassignDisklessPartitions() { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1); + ctx.unfenceBrokers(0, 1); + + String topic = "foo"; + ctx.createTestTopic(topic, new int[][] {new int[] {0}}, Map.of(DISKLESS_ENABLE_CONFIG, "true"), (short) 0); + + // No change in the replication factor. + ControllerResult alterResult1 = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(List.of( + new ReassignableTopic().setName(topic).setPartitions(List.of( + new ReassignablePartition().setPartitionIndex(0).setReplicas(List.of(1))))))); + assertEquals(new AlterPartitionReassignmentsResponseData(). + setErrorMessage(null).setResponses(List.of( + new ReassignableTopicResponse().setName(topic).setPartitions(List.of( + new ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), + alterResult1.response()); + + ctx.replay(alterResult1.records()); + ListPartitionReassignmentsResponseData currentReassigning = + new ListPartitionReassignmentsResponseData().setErrorMessage(null). + setTopics(List.of(new OngoingTopicReassignment().setName(topic).setPartitions(List.of( + new OngoingPartitionReassignment().setPartitionIndex(0) + .setRemovingReplicas(List.of(0)) + .setAddingReplicas(List.of(1)) + .setReplicas(List.of(1, 0)))))); + assertEquals(currentReassigning, replication.listPartitionReassignments(List.of( + new ListPartitionReassignmentsTopics().setName(topic). + setPartitionIndexes(List.of(0))), Long.MAX_VALUE)); + + // Try to increase the replication factor. + ControllerResult alterResult2 = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(List.of( + new ReassignableTopic().setName(topic).setPartitions(List.of( + new ReassignablePartition().setPartitionIndex(0) + .setReplicas(List.of(0, 1))))))); + assertEquals(new AlterPartitionReassignmentsResponseData() + .setErrorMessage(null).setResponses(List.of( + new ReassignableTopicResponse().setName(topic).setPartitions(List.of( + new ReassignablePartitionResponse().setPartitionIndex(0) + .setErrorCode(INVALID_REPLICATION_FACTOR.code()) + .setErrorMessage("The replication factor is changed from 1 to 2"))))), + alterResult2.response()); + ctx.replay(alterResult2.records()); + assertEquals(currentReassigning, replication.listPartitionReassignments(List.of( + new ListPartitionReassignmentsTopics().setName(topic) + .setPartitionIndexes(List.of(0))), Long.MAX_VALUE)); + } + + @Test + public void testNoLeaderElectionOnBrokerFenced_noRacks() { + // With RF=1 (no racks), when the single replica is fenced, the leader goes offline + // and no new leader can be elected since there are no other replicas. + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + new int[][] { + new int[] {0} + }, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + List records = new ArrayList<>(); + replication.handleBrokerFenced(0, records); + ctx.replay(records); + + PartitionRegistration partition = replication.getPartition(topicId, 0); + assertNotNull(partition, "Partition should exist after leader fencing"); + assertArrayEquals(new int[]{0}, partition.isr, "ISR should remain unchanged as there is only one replica"); + assertEquals(-1, partition.leader, "Leader should be offline after fencing"); + } + + @Test + public void testLeaderElectionOnBrokerFenced_withRacks() { + // With RF=3 (racks), when the leader is fenced, a new leader should be elected + // from the remaining replicas in other racks. + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c"); + ctx.unfenceBrokers(0, 1, 2); + + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + 1, + (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + PartitionRegistration partitionBefore = replication.getPartition(topicId, 0); + int originalLeader = partitionBefore.leader; + + List records = new ArrayList<>(); + replication.handleBrokerFenced(originalLeader, records); + ctx.replay(records); + + PartitionRegistration partition = replication.getPartition(topicId, 0); + assertNotNull(partition, "Partition should exist after leader fencing"); + assertEquals(3, partition.replicas.length, "Replicas should remain unchanged (RF=3)"); + assertEquals(2, partition.isr.length, "ISR should shrink by 1 after fencing the leader"); + assertNotEquals(originalLeader, partition.leader, "Leader should change after fencing"); + assertTrue(partition.leader >= 0, "A new leader should be elected from remaining replicas"); + } + + @Test + public void testNoReplicaChangeOnShutdown_noRacks() { + // With RF=1 (no racks), when the single replica is shutdown, the leader goes offline + // and no new leader can be elected since there are no other replicas. + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + new int[][] { + new int[] {0} + }, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + List records = new ArrayList<>(); + replication.handleBrokerShutdown(0, true, records); + ctx.replay(records); + + PartitionRegistration partition = replication.getPartition(topicId, 0); + assertNotNull(partition, "Partition should exist after leader shutdown"); + assertArrayEquals(new int[]{0}, partition.isr, "ISR should remain unchanged as there is only one replica"); + assertEquals(-1, partition.leader, "Leader should be offline after shutdown"); + } + + @Test + public void testReplicaChangeOnShutdown_withRacks() { + // With RF=3 (racks), when the leader broker is shutdown, the ISR shrinks and a new leader + // is elected from the remaining replicas in other racks. + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c"); + ctx.unfenceBrokers(0, 1, 2); + + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + 1, + (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + // Capture the actual leader before shutdown + PartitionRegistration initialPartition = replication.getPartition(topicId, 0); + int originalLeader = initialPartition.leader; + + List records = new ArrayList<>(); + replication.handleBrokerShutdown(originalLeader, true, records); + ctx.replay(records); + + PartitionRegistration partition = replication.getPartition(topicId, 0); + assertNotNull(partition, "Partition should exist after broker shutdown"); + assertEquals(3, partition.replicas.length, "Replicas should remain unchanged (RF=3)"); + assertEquals(2, partition.isr.length, "ISR should shrink by 1 after shutdown"); + assertTrue(partition.leader >= 0, "A new leader should be elected from remaining replicas"); + assertNotEquals(originalLeader, partition.leader, "Leader should change after shutdown"); + } + + @Test + void testDisklessMarksLeaderOfflineOnUnregister_noRacks() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + final int numPartitions = 6; + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + numPartitions, + (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + List records = new ArrayList<>(); + replication.handleBrokerUnregistered(0, 100, records); + ctx.replay(records); + + // All partitions should remain present and keep the original replica/ISR, + // only the leader should be marked offline if placed on the unregistered broker. + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + PartitionRegistration partition = replication.getPartition(topicId, partitionId); + assertNotNull(partition, "Partition " + partitionId + " should exist after broker unregistration"); + assertEquals(1, partition.replicas.length, "Replicas [" + Arrays.toString(partition.replicas) + "] should stay unchanged for partition " + partitionId); + assertEquals(1, partition.isr.length, "ISR [" + Arrays.toString(partition.isr) + "] should stay unchanged for partition " + partitionId); + if (partition.preferredReplica() == 0) { + assertEquals(-1, partition.leader, "Leader should be offline for partition " + partitionId); + } else { + assertTrue(partition.leader > 0, "Leader should be online for partition " + partitionId); + } + } + // Sticking to keep partitions offline, as availability is managed by the Diskless metadata transformation + // with a fallback to "any node available"; not the KRaft registered metadata. + // Replicas will be reported as offline, so operators are aware of the underprovisioning, and can act on it. + // If they need to move the replicas, they can do that using regular tooling. + } + + @Test + void testDisklessMarksLeaderOfflineOnUnregister_withRacks() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c"); + ctx.unfenceBrokers(0, 1, 2); + + final int numPartitions = 6; + CreatableTopicResult createResult = ctx.createTestTopic( + "foo", + numPartitions, + (short) 1, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + final Uuid topicId = createResult.topicId(); + + List records = new ArrayList<>(); + replication.handleBrokerUnregistered(0, 100, records); + ctx.replay(records); + + // All partitions should remain present with original replicas. + // ISR should shrink and leaders should move to other brokers. + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + PartitionRegistration partition = replication.getPartition(topicId, partitionId); + assertNotNull(partition, "Partition " + partitionId + " should exist after broker unregistration"); + assertEquals(3, partition.replicas.length, "Replicas should stay unchanged for partition " + partitionId); + assertEquals(2, partition.isr.length, "ISR should shrink to 2 for partition " + partitionId); + assertTrue(partition.leader > 0, "Leader should remain online for partition " + partitionId); + } + } + + @Test + void testManualReplicaAssignmentsShouldBeAllowed() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + // Expectation: providing manual replica assignments for a diskless topic with managed-replicas should be allowed. + ctx.createTestTopic( + "foo", + new int[][] {new int[] {0, 1}, new int[] {1, 2}}, + Map.of(DISKLESS_ENABLE_CONFIG, "true"), + NONE.code() + ); + } + } } From 68480d4fbffe18511dc151d730919d371bc6d59d Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Mon, 9 Mar 2026 13:42:56 -0500 Subject: [PATCH 4/5] fixup! feat(controller:diskless): add server config for managed replicas --- .../apache/kafka/server/config/ServerConfigs.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index 6970984e5c6..9a69ba10d01 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -142,16 +142,16 @@ public class ServerConfigs { "This should only be enabled in non-production environments for testing or migration purposes. " + "When enabled, topics can have their diskless.enable config changed from false to true."; - // When enabled, diskless topics are created with RF = rack_count (one replica per rack). - // If brokers are registered but none have a rack configured, RF defaults to 1. - // If no brokers are registered, topic creation fails with BROKER_NOT_AVAILABLE. - // When disabled (default), diskless topics use legacy RF=1 behavior. + // When enabled, diskless topics are created with user-defined replication factor. + // RF=-1 resolves to default.replication.factor. Explicit RF values (1, 2, 3, ...) are accepted. + // Placement uses standard rack-aware assignment. + // When disabled (default), diskless topics use legacy RF=1 behavior (RF=-1 resolves to 1, RF > 1 rejected). // This config only affects topic creation. public static final String DISKLESS_MANAGED_REPLICAS_ENABLE_CONFIG = "diskless.managed.rf.enable"; public static final boolean DISKLESS_MANAGED_REPLICAS_ENABLE_DEFAULT = false; public static final String DISKLESS_MANAGED_REPLICAS_ENABLE_DOC = "When enabled, new diskless topics are created " + - "with replication factor equal to the number of racks (one replica per rack). " + - "If brokers are registered but none have a rack configured, RF defaults to 1. " + + "with user-defined replication factor. RF=-1 resolves to default.replication.factor. " + + "Explicit RF values are accepted. Placement uses standard rack-aware assignment. " + "When disabled, diskless topics use legacy RF=1 behavior. " + "This config only affects topic creation."; From aed14b5449231cc66c1eb033acbc08fb164f120f Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Mon, 9 Mar 2026 13:43:38 -0500 Subject: [PATCH 5/5] fixup! feat(metadata:diskless): implement managed replicas for diskless topics --- .../controller/ReplicationControlManager.java | 50 ++----- .../ReplicationControlManagerTest.java | 141 ++++++++++++++---- 2 files changed, 119 insertions(+), 72 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 9c4a37595b0..1990b68b7e5 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; -import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.InvalidPartitionsException; import org.apache.kafka.common.errors.InvalidReplicaAssignmentException; import org.apache.kafka.common.errors.InvalidReplicationFactorException; @@ -825,13 +824,12 @@ private ApiError createTopic(ControllerRequestContext context, "when the diskless storage system is disabled. " + "Please enable the diskless storage system to create diskless topics."); } - // Diskless RF validation: accept -1 (auto) or 1 (backward compat) only. - // Explicit RF > 1 rejected: users shouldn't need to know rack topology. - // Note: RF=1 is accepted for API backward compatibility, but when managed replicas - // are enabled, the actual RF is always computed from rack topology (rackCardinality). - if (Math.abs(topic.replicationFactor()) != 1) { + // Diskless RF validation: + // When managed replicas enabled: any valid RF accepted (standard Kafka validation applies later). + // When managed replicas disabled (legacy): only RF=1 or RF=-1 (resolves to 1). + if (!isDisklessManagedReplicasEnabled && Math.abs(topic.replicationFactor()) != 1) { return new ApiError(Errors.INVALID_REPLICATION_FACTOR, - "Replication factor for diskless topics must be 1 or -1 (system-computed from rack topology)."); + "Replication factor for diskless topics must be 1 or -1 when managed replicas are disabled."); } } @@ -895,17 +893,15 @@ private ApiError createTopic(ControllerRequestContext context, int numPartitions = topic.numPartitions() == -1 ? defaultNumPartitions : topic.numPartitions(); short classicReplicationFactor = topic.replicationFactor() == -1 ? defaultReplicationFactor : topic.replicationFactor(); - // For managed diskless: always use rackCardinality() regardless of requested RF. - // RF=1 in the request is accepted for backward compat but overridden here. - // Throws BrokerNotAvailableException or InvalidReplicationFactorException on failure, - // which are caught by the caller and converted to ApiError. - short disklessReplicationFactor = disklessEnabled && isDisklessManagedReplicasEnabled ? rackCardinality() : 1; + // For managed diskless: use same resolution as classic (RF=-1 → defaultReplicationFactor, else user value). + // For unmanaged diskless (legacy): always RF=1. + short disklessReplicationFactor = isDisklessManagedReplicasEnabled ? classicReplicationFactor : 1; short replicationFactor = disklessEnabled ? disklessReplicationFactor : classicReplicationFactor; try { TopicAssignment topicAssignment; Predicate brokerFilter; - // Diskless managed-replicas is equivalent to classic topic assignment, - // but RF is defined by number of racks + // Diskless managed-replicas uses standard rack-aware assignment + // with user-defined RF (or defaultReplicationFactor if RF=-1) if (!disklessEnabled || isDisklessManagedReplicasEnabled) { topicAssignment = clusterControl.replicaPlacer().place(new PlacementSpec( 0, @@ -976,32 +972,6 @@ private ApiError createTopic(ControllerRequestContext context, return ApiError.NONE; } - /** - * Computes the replication factor for diskless topics based on rack topology. - * Returns the number of distinct racks in the cluster, ensuring one replica per rack. - * Brokers with no rack configured are all treated as belonging to a single logical rack, - * so if at least one broker is registered but none have a rack configured, the result is RF=1. - * - * @return the number of distinct racks as a short - * @throws BrokerNotAvailableException if no brokers are registered - * @throws InvalidReplicationFactorException if rack count exceeds Short.MAX_VALUE - */ - private short rackCardinality() { - final Collection brokerRegistrations = clusterControl.brokerRegistrations().values(); - final long racks = brokerRegistrations.stream() - .map(BrokerRegistration::rack) - .distinct() - .count(); - if (racks > Short.MAX_VALUE) { - // Unfeasible but technically possible scenario. - // Would require more than 32,768 brokers and each with a different rack - throw new InvalidReplicationFactorException("Unexpected scenario: rack cardinality is not within short range (" + racks + "). Failing topic creation."); - } - if (racks == 0) - throw new BrokerNotAvailableException("No brokers available to create diskless topic."); - return (short) racks; - } - private boolean disklessEnabledOnTopicCreation(final Map creationConfigs) { final String disklessEnableConfigValue = creationConfigs.get(DISKLESS_ENABLE_CONFIG); final boolean disklessConfigEnabled; diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index eb1a0f382b9..db543330445 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -4596,12 +4596,10 @@ public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfi ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); ControllerResult result = replicationControl.createTopics(requestContext, request, Set.of("foo")); - // Then the topic creation should fail with BROKER_NOT_AVAILABLE error - CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); - expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). - setErrorCode(Errors.BROKER_NOT_AVAILABLE.code()). - setErrorMessage("No brokers available to create diskless topic.")); - assertEquals(expectedResponse, withoutConfigs(result.response())); + // Then the topic creation should fail with INVALID_REPLICATION_FACTOR + // (standard Kafka behavior when no brokers can satisfy the requested RF) + assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), + result.response().topics().find("foo").errorCode()); // Given brokers are registered ctx.registerBrokers(0, 1, 2); @@ -4610,10 +4608,11 @@ public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfi // When creating a topic with diskless enabled ControllerResult result2 = replicationControl.createTopics(requestContext, request, Set.of("foo")); - // Then the topic creation should succeed, regardless of fenced brokers + // Then the topic creation should succeed with RF=3 (default.replication.factor), + // regardless of fenced brokers CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData(); expectedResponse2.topics().add(new CreatableTopicResult().setName("foo"). - setNumPartitions(1).setReplicationFactor((short) 1). + setNumPartitions(1).setReplicationFactor((short) 3). setErrorMessage(null).setErrorCode((short) 0). setTopicId(result2.response().topics().find("foo").topicId())); CreateTopicsResponseData response = result2.response(); @@ -4626,10 +4625,10 @@ public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfi // When creating a topic with diskless enabled ControllerResult result3 = replicationControl.createTopics(requestContext, request, Set.of("foo")); - // Then the topic creation should succeed, regardless of the RF + // Then the topic creation should succeed with RF=3 (default.replication.factor) CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData(); expectedResponse3.topics().add(new CreatableTopicResult().setName("foo"). - setNumPartitions(1).setReplicationFactor((short) 1). + setNumPartitions(1).setReplicationFactor((short) 3). setErrorMessage(null).setErrorCode((short) 0). setTopicId(result3.response().topics().find("foo").topicId())); assertEquals(expectedResponse3, withoutConfigs(result3.response())); @@ -4644,18 +4643,11 @@ public void testCreateDisklessTopic_noRacks(boolean logDisklessEnableServerConfi // Given the topic is registered ctx.replay(result3.records()); - assertEquals( - new PartitionRegistration.Builder().setReplicas(new int[] {1}). - setDirectories(new Uuid[] { - Uuid.fromString("TESTBROKER00001DIRAAAA"), - }). - setIsr(new int[] {1}) - .setLeader(1) - .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED) - .setLeaderEpoch(0) - .setPartitionEpoch(0) - .build(), - replicationControl.getPartition(((TopicRecord) result3.records().get(0).message()).topicId(), 0)); + PartitionRegistration partition = replicationControl.getPartition( + ((TopicRecord) result3.records().get(0).message()).topicId(), 0); + assertEquals(3, partition.replicas.length, "RF should be 3 (default.replication.factor)"); + assertEquals(3, partition.isr.length, "All brokers are active so ISR should equal replicas"); + assertTrue(partition.leader >= 0, "Leader should be elected"); // When creating a topic with diskless enabled and already exists ControllerResult result4 = @@ -4697,12 +4689,10 @@ public void testCreateDisklessTopic_withRacks(boolean logDisklessEnableServerCon ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); ControllerResult result = replicationControl.createTopics(requestContext, request, Set.of("foo")); - // Then the topic creation should fail with BROKER_NOT_AVAILABLE error - CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); - expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). - setErrorCode(Errors.BROKER_NOT_AVAILABLE.code()). - setErrorMessage("No brokers available to create diskless topic.")); - assertEquals(expectedResponse, withoutConfigs(result.response())); + // Then the topic creation should fail with INVALID_REPLICATION_FACTOR + // (standard Kafka behavior when no brokers can satisfy the requested RF) + assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), + result.response().topics().find("foo").errorCode()); // Given brokers are registered ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c"); @@ -4775,7 +4765,6 @@ public void testCreateDisklessTopic_withRacks(boolean logDisklessEnableServerCon @CsvSource({ "1, -2, INVALID_REPLICATION_FACTOR", "1, 0, INVALID_REPLICATION_FACTOR", - "1, 2, INVALID_REPLICATION_FACTOR", "-2, 1, INVALID_PARTITIONS", "0, 1, INVALID_PARTITIONS", }) @@ -5049,7 +5038,7 @@ public void testLeaderElectionOnBrokerFenced_withRacks() { CreatableTopicResult createResult = ctx.createTestTopic( "foo", 1, - (short) 1, + (short) 3, Map.of(DISKLESS_ENABLE_CONFIG, "true"), NONE.code() ); @@ -5117,7 +5106,7 @@ public void testReplicaChangeOnShutdown_withRacks() { CreatableTopicResult createResult = ctx.createTestTopic( "foo", 1, - (short) 1, + (short) 3, Map.of(DISKLESS_ENABLE_CONFIG, "true"), NONE.code() ); @@ -5196,7 +5185,7 @@ void testDisklessMarksLeaderOfflineOnUnregister_withRacks() { CreatableTopicResult createResult = ctx.createTestTopic( "foo", numPartitions, - (short) 1, + (short) 3, Map.of(DISKLESS_ENABLE_CONFIG, "true"), NONE.code() ); @@ -5234,5 +5223,93 @@ void testManualReplicaAssignmentsShouldBeAllowed() { NONE.code() ); } + + @Test + void testCreateDisklessTopicWithExplicitRF() { + // Verify that explicit RF=2 is honored (not overridden to rack count or rejected). + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokersWithRacks(0, "a", 1, "b", 2, "c"); + ctx.unfenceBrokers(0, 1, 2); + + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + CreateTopicsRequestData request = new CreateTopicsRequestData(); + request.topics().add(new CreatableTopic().setName("foo") + .setNumPartitions(2).setReplicationFactor((short) 2) + .setConfigs(new CreateTopicsRequestData.CreatableTopicConfigCollection(List.of( + new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue("true") + ).iterator()))); + + ControllerResult result = + replication.createTopics(requestContext, request, Set.of("foo")); + assertEquals(NONE.code(), result.response().topics().find("foo").errorCode()); + assertEquals(2, result.response().topics().find("foo").replicationFactor()); + assertEquals(2, result.response().topics().find("foo").numPartitions()); + + ctx.replay(result.records()); + PartitionRegistration partition = replication.getPartition( + ((TopicRecord) result.records().get(0).message()).topicId(), 0); + assertEquals(2, partition.replicas.length, "RF=2 should be honored"); + assertEquals(2, partition.isr.length, "All replicas should be in ISR"); + } + + @Test + void testCreateDisklessTopicWithRFExceedingBrokerCount() { + // Verify that RF > broker count fails with standard Kafka error. + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(true) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + CreateTopicsRequestData request = new CreateTopicsRequestData(); + request.topics().add(new CreatableTopic().setName("foo") + .setNumPartitions(1).setReplicationFactor((short) 5) + .setConfigs(new CreateTopicsRequestData.CreatableTopicConfigCollection(List.of( + new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue("true") + ).iterator()))); + + ControllerResult result = + replication.createTopics(requestContext, request, Set.of("foo")); + assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), + result.response().topics().find("foo").errorCode()); + } + + @Test + void testCreateDisklessTopicWithRFGreaterThanOneRejectedWhenManagedDisabled() { + // When managed replicas is disabled, RF > 1 should be rejected. + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setDisklessStorageSystemEnabled(true) + .setDisklessManagedReplicasEnabled(false) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + CreateTopicsRequestData request = new CreateTopicsRequestData(); + request.topics().add(new CreatableTopic().setName("foo") + .setNumPartitions(1).setReplicationFactor((short) 2) + .setConfigs(new CreateTopicsRequestData.CreatableTopicConfigCollection(List.of( + new CreateTopicsRequestData.CreatableTopicConfig() + .setName(DISKLESS_ENABLE_CONFIG) + .setValue("true") + ).iterator()))); + + ControllerResult result = + replication.createTopics(requestContext, request, Set.of("foo")); + assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), + result.response().topics().find("foo").errorCode()); + } } }