From c79963c7dcd03b9468e33e1d76750bebba0afcae Mon Sep 17 00:00:00 2001
From: Giuseppe Lillo
Date: Mon, 23 Feb 2026 15:48:17 +0100
Subject: [PATCH] feat(inkless): avoid switching from compacted to
non-compacted when ClassicTopicRemoteStorageForcePolicy is enabled
---
.../java/kafka/server/InklessConfigsTest.java | 53 +++++++++++++++++++
.../ClassicTopicRemoteStorageForcePolicy.java | 44 +++++++++++++++
.../ConfigurationControlManager.java | 32 ++++++++++-
.../kafka/controller/QuorumController.java | 2 +
.../ConfigurationControlManagerTest.java | 28 ++++++++++
5 files changed, 157 insertions(+), 2 deletions(-)
diff --git a/core/src/test/java/kafka/server/InklessConfigsTest.java b/core/src/test/java/kafka/server/InklessConfigsTest.java
index 59dd2d0128a..24032888fa4 100644
--- a/core/src/test/java/kafka/server/InklessConfigsTest.java
+++ b/core/src/test/java/kafka/server/InklessConfigsTest.java
@@ -62,6 +62,7 @@
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT;
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG;
+import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_DELETE;
import static org.apache.kafka.common.config.TopicConfig.DISKLESS_ENABLE_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -336,6 +337,58 @@ void compactedRegexExcludedTopicIsExcludedFromForcePolicy() throws Exception {
cluster.close();
}
}
+
+ @Test
+ void changingCleanupPolicyFromCompactToDeleteRequiresRemoteStorageEnable() throws Exception {
+ final KafkaClusterTestKit cluster = initWithClassicRemoteStorageForceEnabled();
+ final Map clientConfigs = new HashMap<>();
+ clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+
+ try (final Admin admin = AdminClient.create(clientConfigs)) {
+ final String compactedTopic = "compacted-to-delete-rejected";
+ createTopic(admin, compactedTopic, Map.of(
+ CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT,
+ REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"
+ ));
+
+ final ExecutionException exception = assertThrows(
+ ExecutionException.class,
+ () -> alterTopicConfig(admin, compactedTopic, Map.of(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE))
+ );
+ assertEquals(
+ "It is invalid to change cleanup.policy from compact to delete without enabling remote.storage.enable " +
+ "when classic.remote.storage.force.enable is enabled.",
+ exception.getCause().getMessage()
+ );
+ } finally {
+ cluster.close();
+ }
+ }
+
+ @Test
+ void changingCleanupPolicyFromCompactToDeleteWithRemoteStorageEnableIsAllowed() throws Exception {
+ final KafkaClusterTestKit cluster = initWithClassicRemoteStorageForceEnabled();
+ final Map clientConfigs = new HashMap<>();
+ clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+
+ try (final Admin admin = AdminClient.create(clientConfigs)) {
+ final String compactedTopic = "compacted-to-delete-allowed";
+ createTopic(admin, compactedTopic, Map.of(
+ CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT,
+ REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"
+ ));
+ alterTopicConfig(admin, compactedTopic, Map.of(
+ CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE,
+ REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"
+ ));
+
+ final Map alteredTopicConfig = getTopicConfig(admin, compactedTopic);
+ assertEquals(CLEANUP_POLICY_DELETE, alteredTopicConfig.get(CLEANUP_POLICY_CONFIG));
+ assertEquals("true", alteredTopicConfig.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG));
+ } finally {
+ cluster.close();
+ }
+ }
}
public void createTopic(Admin admin, String topic, Map configs) throws Exception {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClassicTopicRemoteStorageForcePolicy.java b/metadata/src/main/java/org/apache/kafka/controller/ClassicTopicRemoteStorageForcePolicy.java
index 5bdb4364015..d345be35440 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClassicTopicRemoteStorageForcePolicy.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClassicTopicRemoteStorageForcePolicy.java
@@ -70,6 +70,31 @@ void maybeForceRemoteStorageEnable(
}
}
+ void validateCompactedToDeleteCleanupPolicyTransition(
+ final String topicName,
+ final Map targetConfigs,
+ final Map existingConfigs
+ ) {
+ if (!enabled
+ || isInternalTopic(topicName)
+ || topicExcludedByRegex(topicName)) {
+ return;
+ }
+ if (!cleanupPolicyContainsCompact(existingConfigs)) {
+ return;
+ }
+ if (!cleanupPolicyIsDeleteOnly(targetConfigs)) {
+ return;
+ }
+ final boolean disklessEnabled = Boolean.parseBoolean(targetConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG));
+ final boolean remoteStorageEnabled = Boolean.parseBoolean(targetConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG));
+ if (!disklessEnabled && !remoteStorageEnabled) {
+ throw new org.apache.kafka.common.errors.InvalidConfigurationException(
+ "It is invalid to change cleanup.policy from compact to delete without enabling remote.storage.enable " +
+ "when classic.remote.storage.force.enable is enabled.");
+ }
+ }
+
private boolean shouldForceRemoteStorageEnable(
final String topicName,
final boolean disklessEnabled,
@@ -100,6 +125,25 @@ private boolean cleanupPolicyContainsCompact(final Map topicConf
return false;
}
+ private boolean cleanupPolicyIsDeleteOnly(final Map topicConfigs) {
+ final String cleanupPolicy = topicConfigs.get(TopicConfig.CLEANUP_POLICY_CONFIG);
+ if (cleanupPolicy == null) {
+ return false;
+ }
+ int nonEmptyPolicies = 0;
+ for (String policy : cleanupPolicy.split(",")) {
+ final String trimmedPolicy = policy.trim();
+ if (trimmedPolicy.isEmpty()) {
+ continue;
+ }
+ nonEmptyPolicies++;
+ if (!TopicConfig.CLEANUP_POLICY_DELETE.equals(trimmedPolicy)) {
+ return false;
+ }
+ }
+ return nonEmptyPolicies == 1;
+ }
+
private boolean topicExcludedByRegex(final String topicName) {
for (Pattern pattern : excludeTopicPatterns) {
if (pattern.matcher(topicName).matches()) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index d765bc067ff..3f72cec5ab3 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -77,6 +77,7 @@ public class ConfigurationControlManager {
private final Map staticConfig;
private final ConfigResource currentController;
private final FeatureControlManager featureControl;
+ private final ClassicTopicRemoteStorageForcePolicy classicTopicRemoteStorageForcePolicy;
static class Builder {
private LogContext logContext = null;
@@ -88,6 +89,8 @@ static class Builder {
private Map staticConfig = Map.of();
private int nodeId = 0;
private FeatureControlManager featureControl = null;
+ private boolean classicRemoteStorageForceEnabled = false;
+ private List classicRemoteStorageForceExcludeTopicRegexes = List.of();
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
@@ -134,6 +137,16 @@ Builder setFeatureControl(FeatureControlManager featureControl) {
return this;
}
+ Builder setClassicRemoteStorageForceEnabled(boolean classicRemoteStorageForceEnabled) {
+ this.classicRemoteStorageForceEnabled = classicRemoteStorageForceEnabled;
+ return this;
+ }
+
+ Builder setClassicRemoteStorageForceExcludeTopicRegexes(List classicRemoteStorageForceExcludeTopicRegexes) {
+ this.classicRemoteStorageForceExcludeTopicRegexes = classicRemoteStorageForceExcludeTopicRegexes;
+ return this;
+ }
+
ConfigurationControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
@@ -152,7 +165,9 @@ ConfigurationControlManager build() {
validator,
staticConfig,
nodeId,
- featureControl);
+ featureControl,
+ classicRemoteStorageForceEnabled,
+ classicRemoteStorageForceExcludeTopicRegexes);
}
}
@@ -164,7 +179,9 @@ private ConfigurationControlManager(LogContext logContext,
ConfigurationValidator validator,
Map staticConfig,
int nodeId,
- FeatureControlManager featureControl
+ FeatureControlManager featureControl,
+ boolean classicRemoteStorageForceEnabled,
+ List classicRemoteStorageForceExcludeTopicRegexes
) {
this.log = logContext.logger(ConfigurationControlManager.class);
this.snapshotRegistry = snapshotRegistry;
@@ -177,6 +194,10 @@ private ConfigurationControlManager(LogContext logContext,
this.staticConfig = Map.copyOf(staticConfig);
this.currentController = new ConfigResource(Type.BROKER, Integer.toString(nodeId));
this.featureControl = featureControl;
+ this.classicTopicRemoteStorageForcePolicy = new ClassicTopicRemoteStorageForcePolicy(
+ classicRemoteStorageForceEnabled,
+ classicRemoteStorageForceExcludeTopicRegexes
+ );
}
SnapshotRegistry snapshotRegistry() {
@@ -373,6 +394,13 @@ private ApiError validateAlterConfig(
// in the list passed to the policy in order to maintain backwards compatibility
}
try {
+ if (configResource.type() == Type.TOPIC) {
+ classicTopicRemoteStorageForcePolicy.validateCompactedToDeleteCleanupPolicyTransition(
+ configResource.name(),
+ allConfigs,
+ existingConfigsMap
+ );
+ }
validator.validate(configResource, allConfigs, existingConfigsMap);
if (!newlyCreatedResource) {
existenceChecker.accept(configResource);
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index f8110170a0e..198772bfc30 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1569,6 +1569,8 @@ private QuorumController(
setValidator(configurationValidator).
setStaticConfig(staticConfig).
setNodeId(nodeId).
+ setClassicRemoteStorageForceEnabled(classicRemoteStorageForceEnabled).
+ setClassicRemoteStorageForceExcludeTopicRegexes(classicRemoteStorageForceExcludeTopicRegexes).
setFeatureControl(featureControl).
build();
this.producerIdControlManager = new ProducerIdControlManager.Builder().
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 2c93d1100ec..50580103976 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -219,6 +219,34 @@ public void testIncrementalAlterConfig() {
invalidConfigValueResult.response().message());
}
+ @Test
+ public void testCleanupPolicyCompactToDeleteRejectedWhenClassicRemoteStorageForceEnabled() {
+ ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
+ setFeatureControl(createFeatureControlManager()).
+ setKafkaConfigSchema(SCHEMA).
+ setClassicRemoteStorageForceEnabled(true).
+ build();
+
+ RecordTestUtils.replayAll(manager, List.of(
+ new ApiMessageAndVersion(new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic")
+ .setName(TopicConfig.CLEANUP_POLICY_CONFIG).setValue(TopicConfig.CLEANUP_POLICY_COMPACT), (short) 0),
+ new ApiMessageAndVersion(new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic")
+ .setName(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG).setValue("false"), (short) 0)
+ ));
+
+ ControllerResult result = manager.incrementalAlterConfig(
+ MYTOPIC,
+ Map.of(TopicConfig.CLEANUP_POLICY_CONFIG, entry(SET, TopicConfig.CLEANUP_POLICY_DELETE)),
+ false
+ );
+ assertEquals(Errors.INVALID_CONFIG, result.response().error());
+ assertEquals(
+ "It is invalid to change cleanup.policy from compact to delete without enabling remote.storage.enable " +
+ "when classic.remote.storage.force.enable is enabled.",
+ result.response().message()
+ );
+ }
+
@Test
public void testIncrementalAlterMultipleConfigValues() {
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().