From c79963c7dcd03b9468e33e1d76750bebba0afcae Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Mon, 23 Feb 2026 15:48:17 +0100 Subject: [PATCH] feat(inkless): avoid switching from compacted to non-compacted when ClassicTopicRemoteStorageForcePolicy is enabled --- .../java/kafka/server/InklessConfigsTest.java | 53 +++++++++++++++++++ .../ClassicTopicRemoteStorageForcePolicy.java | 44 +++++++++++++++ .../ConfigurationControlManager.java | 32 ++++++++++- .../kafka/controller/QuorumController.java | 2 + .../ConfigurationControlManagerTest.java | 28 ++++++++++ 5 files changed, 157 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/kafka/server/InklessConfigsTest.java b/core/src/test/java/kafka/server/InklessConfigsTest.java index 59dd2d0128a..24032888fa4 100644 --- a/core/src/test/java/kafka/server/InklessConfigsTest.java +++ b/core/src/test/java/kafka/server/InklessConfigsTest.java @@ -62,6 +62,7 @@ import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT; import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; +import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_DELETE; import static org.apache.kafka.common.config.TopicConfig.DISKLESS_ENABLE_CONFIG; import static org.apache.kafka.common.config.TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -336,6 +337,58 @@ void compactedRegexExcludedTopicIsExcludedFromForcePolicy() throws Exception { cluster.close(); } } + + @Test + void changingCleanupPolicyFromCompactToDeleteRequiresRemoteStorageEnable() throws Exception { + final KafkaClusterTestKit cluster = initWithClassicRemoteStorageForceEnabled(); + final Map clientConfigs = new HashMap<>(); + clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + + try (final Admin admin = AdminClient.create(clientConfigs)) { + final String compactedTopic = "compacted-to-delete-rejected"; + createTopic(admin, compactedTopic, Map.of( + CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT, + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false" + )); + + final ExecutionException exception = assertThrows( + ExecutionException.class, + () -> alterTopicConfig(admin, compactedTopic, Map.of(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE)) + ); + assertEquals( + "It is invalid to change cleanup.policy from compact to delete without enabling remote.storage.enable " + + "when classic.remote.storage.force.enable is enabled.", + exception.getCause().getMessage() + ); + } finally { + cluster.close(); + } + } + + @Test + void changingCleanupPolicyFromCompactToDeleteWithRemoteStorageEnableIsAllowed() throws Exception { + final KafkaClusterTestKit cluster = initWithClassicRemoteStorageForceEnabled(); + final Map clientConfigs = new HashMap<>(); + clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + + try (final Admin admin = AdminClient.create(clientConfigs)) { + final String compactedTopic = "compacted-to-delete-allowed"; + createTopic(admin, compactedTopic, Map.of( + CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT, + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false" + )); + alterTopicConfig(admin, compactedTopic, Map.of( + CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE, + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true" + )); + + final Map alteredTopicConfig = getTopicConfig(admin, compactedTopic); + assertEquals(CLEANUP_POLICY_DELETE, alteredTopicConfig.get(CLEANUP_POLICY_CONFIG)); + assertEquals("true", alteredTopicConfig.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG)); + } finally { + cluster.close(); + } + } } public void createTopic(Admin admin, String topic, Map configs) throws Exception { diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClassicTopicRemoteStorageForcePolicy.java b/metadata/src/main/java/org/apache/kafka/controller/ClassicTopicRemoteStorageForcePolicy.java index 5bdb4364015..d345be35440 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClassicTopicRemoteStorageForcePolicy.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClassicTopicRemoteStorageForcePolicy.java @@ -70,6 +70,31 @@ void maybeForceRemoteStorageEnable( } } + void validateCompactedToDeleteCleanupPolicyTransition( + final String topicName, + final Map targetConfigs, + final Map existingConfigs + ) { + if (!enabled + || isInternalTopic(topicName) + || topicExcludedByRegex(topicName)) { + return; + } + if (!cleanupPolicyContainsCompact(existingConfigs)) { + return; + } + if (!cleanupPolicyIsDeleteOnly(targetConfigs)) { + return; + } + final boolean disklessEnabled = Boolean.parseBoolean(targetConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)); + final boolean remoteStorageEnabled = Boolean.parseBoolean(targetConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)); + if (!disklessEnabled && !remoteStorageEnabled) { + throw new org.apache.kafka.common.errors.InvalidConfigurationException( + "It is invalid to change cleanup.policy from compact to delete without enabling remote.storage.enable " + + "when classic.remote.storage.force.enable is enabled."); + } + } + private boolean shouldForceRemoteStorageEnable( final String topicName, final boolean disklessEnabled, @@ -100,6 +125,25 @@ private boolean cleanupPolicyContainsCompact(final Map topicConf return false; } + private boolean cleanupPolicyIsDeleteOnly(final Map topicConfigs) { + final String cleanupPolicy = topicConfigs.get(TopicConfig.CLEANUP_POLICY_CONFIG); + if (cleanupPolicy == null) { + return false; + } + int nonEmptyPolicies = 0; + for (String policy : cleanupPolicy.split(",")) { + final String trimmedPolicy = policy.trim(); + if (trimmedPolicy.isEmpty()) { + continue; + } + nonEmptyPolicies++; + if (!TopicConfig.CLEANUP_POLICY_DELETE.equals(trimmedPolicy)) { + return false; + } + } + return nonEmptyPolicies == 1; + } + private boolean topicExcludedByRegex(final String topicName) { for (Pattern pattern : excludeTopicPatterns) { if (pattern.matcher(topicName).matches()) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index d765bc067ff..3f72cec5ab3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -77,6 +77,7 @@ public class ConfigurationControlManager { private final Map staticConfig; private final ConfigResource currentController; private final FeatureControlManager featureControl; + private final ClassicTopicRemoteStorageForcePolicy classicTopicRemoteStorageForcePolicy; static class Builder { private LogContext logContext = null; @@ -88,6 +89,8 @@ static class Builder { private Map staticConfig = Map.of(); private int nodeId = 0; private FeatureControlManager featureControl = null; + private boolean classicRemoteStorageForceEnabled = false; + private List classicRemoteStorageForceExcludeTopicRegexes = List.of(); Builder setLogContext(LogContext logContext) { this.logContext = logContext; @@ -134,6 +137,16 @@ Builder setFeatureControl(FeatureControlManager featureControl) { return this; } + Builder setClassicRemoteStorageForceEnabled(boolean classicRemoteStorageForceEnabled) { + this.classicRemoteStorageForceEnabled = classicRemoteStorageForceEnabled; + return this; + } + + Builder setClassicRemoteStorageForceExcludeTopicRegexes(List classicRemoteStorageForceExcludeTopicRegexes) { + this.classicRemoteStorageForceExcludeTopicRegexes = classicRemoteStorageForceExcludeTopicRegexes; + return this; + } + ConfigurationControlManager build() { if (logContext == null) logContext = new LogContext(); if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); @@ -152,7 +165,9 @@ ConfigurationControlManager build() { validator, staticConfig, nodeId, - featureControl); + featureControl, + classicRemoteStorageForceEnabled, + classicRemoteStorageForceExcludeTopicRegexes); } } @@ -164,7 +179,9 @@ private ConfigurationControlManager(LogContext logContext, ConfigurationValidator validator, Map staticConfig, int nodeId, - FeatureControlManager featureControl + FeatureControlManager featureControl, + boolean classicRemoteStorageForceEnabled, + List classicRemoteStorageForceExcludeTopicRegexes ) { this.log = logContext.logger(ConfigurationControlManager.class); this.snapshotRegistry = snapshotRegistry; @@ -177,6 +194,10 @@ private ConfigurationControlManager(LogContext logContext, this.staticConfig = Map.copyOf(staticConfig); this.currentController = new ConfigResource(Type.BROKER, Integer.toString(nodeId)); this.featureControl = featureControl; + this.classicTopicRemoteStorageForcePolicy = new ClassicTopicRemoteStorageForcePolicy( + classicRemoteStorageForceEnabled, + classicRemoteStorageForceExcludeTopicRegexes + ); } SnapshotRegistry snapshotRegistry() { @@ -373,6 +394,13 @@ private ApiError validateAlterConfig( // in the list passed to the policy in order to maintain backwards compatibility } try { + if (configResource.type() == Type.TOPIC) { + classicTopicRemoteStorageForcePolicy.validateCompactedToDeleteCleanupPolicyTransition( + configResource.name(), + allConfigs, + existingConfigsMap + ); + } validator.validate(configResource, allConfigs, existingConfigsMap); if (!newlyCreatedResource) { existenceChecker.accept(configResource); diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index f8110170a0e..198772bfc30 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1569,6 +1569,8 @@ private QuorumController( setValidator(configurationValidator). setStaticConfig(staticConfig). setNodeId(nodeId). + setClassicRemoteStorageForceEnabled(classicRemoteStorageForceEnabled). + setClassicRemoteStorageForceExcludeTopicRegexes(classicRemoteStorageForceExcludeTopicRegexes). setFeatureControl(featureControl). build(); this.producerIdControlManager = new ProducerIdControlManager.Builder(). diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index 2c93d1100ec..50580103976 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -219,6 +219,34 @@ public void testIncrementalAlterConfig() { invalidConfigValueResult.response().message()); } + @Test + public void testCleanupPolicyCompactToDeleteRejectedWhenClassicRemoteStorageForceEnabled() { + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setFeatureControl(createFeatureControlManager()). + setKafkaConfigSchema(SCHEMA). + setClassicRemoteStorageForceEnabled(true). + build(); + + RecordTestUtils.replayAll(manager, List.of( + new ApiMessageAndVersion(new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic") + .setName(TopicConfig.CLEANUP_POLICY_CONFIG).setValue(TopicConfig.CLEANUP_POLICY_COMPACT), (short) 0), + new ApiMessageAndVersion(new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic") + .setName(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG).setValue("false"), (short) 0) + )); + + ControllerResult result = manager.incrementalAlterConfig( + MYTOPIC, + Map.of(TopicConfig.CLEANUP_POLICY_CONFIG, entry(SET, TopicConfig.CLEANUP_POLICY_DELETE)), + false + ); + assertEquals(Errors.INVALID_CONFIG, result.response().error()); + assertEquals( + "It is invalid to change cleanup.policy from compact to delete without enabling remote.storage.enable " + + "when classic.remote.storage.force.enable is enabled.", + result.response().message() + ); + } + @Test public void testIncrementalAlterMultipleConfigValues() { ConfigurationControlManager manager = new ConfigurationControlManager.Builder().