From 995b5fa817c4031178c3ae6583f9a3dc7ba5b13a Mon Sep 17 00:00:00 2001
From: Giuseppe Lillo
Date: Mon, 19 Jan 2026 15:47:35 +0100
Subject: [PATCH 1/5] Allow switching diskless.enable from false to true
`diskless.enable` for a topic can be switched from false to true,
only if `diskless.migration.enable` is enabled.
---
.../ControllerConfigurationValidator.scala | 3 +-
.../main/scala/kafka/server/KafkaConfig.scala | 1 +
.../java/kafka/server/InklessConfigsTest.java | 34 ++++++++++++++++++-
.../scala/unit/kafka/log/LogConfigTest.scala | 31 +++++++++++++++++
.../kafka/server/config/ServerConfigs.java | 8 +++++
.../storage/internals/log/LogConfig.java | 30 +++++++++++-----
6 files changed, 97 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index f163a2739ae..d5e944685da 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -118,7 +118,8 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
nullTopicConfigs.mkString(","))
}
LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap,
- kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
+ kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
+ kafkaConfig.disklessMigrationEnabled)
case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
val properties = new Properties()
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 0cec30c06a0..81e91aa7316 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -428,6 +428,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
/** Diskless Configuration */
val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG)
+ val disklessMigrationEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_MIGRATION_ENABLE_CONFIG)
def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.addReconfigurable(reconfigurable)
diff --git a/core/src/test/java/kafka/server/InklessConfigsTest.java b/core/src/test/java/kafka/server/InklessConfigsTest.java
index 21548e98fdc..5c87cf4912f 100644
--- a/core/src/test/java/kafka/server/InklessConfigsTest.java
+++ b/core/src/test/java/kafka/server/InklessConfigsTest.java
@@ -68,7 +68,11 @@ public class InklessConfigsTest {
@Container
protected static MinioContainer s3Container = S3TestContainer.minio();
- private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig) throws Exception {
+ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig) throws Exception {
+ return init(defaultDisklessEnableConfig, disklessStorageEnableConfig, false);
+ }
+
+ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig, boolean disklessMigrationEnableConfig) throws Exception {
final TestKitNodes nodes = new TestKitNodes.Builder()
.setCombined(true)
.setNumBrokerNodes(1)
@@ -78,6 +82,7 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di
.setConfigProp(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
.setConfigProp(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, String.valueOf(defaultDisklessEnableConfig))
.setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, String.valueOf(disklessStorageEnableConfig))
+ .setConfigProp(ServerConfigs.DISKLESS_MIGRATION_ENABLE_CONFIG, String.valueOf(disklessMigrationEnableConfig))
// PG control plane config
.setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_CLASS_CONFIG, PostgresControlPlane.class.getName())
.setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.CONNECTION_STRING_CONFIG, pgContainer.getJdbcUrl())
@@ -186,6 +191,33 @@ public void classicTopicWithDisklessDefaultTrueConfigs() throws Exception {
cluster.close();
}
+ @Test
+ public void disklessMigrationEnabled() throws Exception {
+ // Initialize cluster with diskless migration enabled
+ var cluster = init(false, true, true);
+ Map clientConfigs = new HashMap<>();
+ clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+
+ try (Admin admin = AdminClient.create(clientConfigs)) {
+ // When creating a new topic with diskless.enable=false
+ final String classicTopic = "classicTopic";
+ createTopic(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false"));
+ // Then diskless.enable is set to false in the topic config
+ var classicTopicConfig = getTopicConfig(admin, classicTopic);
+ assertEquals("false", classicTopicConfig.get(DISKLESS_ENABLE_CONFIG));
+
+ // When migration is enabled, it SHOULD be possible to turn on diskless after the topic is created
+ alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true"));
+ // Verify the config was updated
+ var updatedTopicConfig = getTopicConfig(admin, classicTopic);
+ assertEquals("true", updatedTopicConfig.get(DISKLESS_ENABLE_CONFIG));
+
+ // But it should still NOT be possible to turn off diskless after enabling it
+ assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false")));
+ }
+ cluster.close();
+ }
+
public void createTopic(Admin admin, String topic, Map configs) throws Exception {
admin.createTopics(Collections.singletonList(
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index cbe7ffeed41..2a7a51ead74 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -478,6 +478,37 @@ class LogConfigTest {
LogConfig.validate(disklessAlreadyDisabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false)
}
+ @Test
+ def testDisklessMigrationEnabled(): Unit = {
+ val kafkaProps = TestUtils.createDummyBrokerConfig()
+ val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
+ val isDisklessMigrationEnabled = true
+
+ val disklessAlreadyEnabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
+ val disklessAlreadyDisabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")
+
+ val setDisklessTrue = new Properties()
+ setDisklessTrue.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
+ val setDisklessFalse = new Properties()
+ setDisklessFalse.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")
+
+ // When migration is enabled:
+ // 1. Should be possible to switch from diskless.enable=false to diskless.enable=true
+ LogConfig.validate(disklessAlreadyDisabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled)
+
+ // 2. Should still NOT be possible to switch from diskless.enable=true to diskless.enable=false (even with migration enabled)
+ assertThrows(
+ classOf[InvalidConfigurationException],
+ () => LogConfig.validate(disklessAlreadyEnabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled)
+ )
+
+ // 3. Should still be possible to keep diskless.enable=true
+ LogConfig.validate(disklessAlreadyEnabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled)
+
+ // 4. Should still be possible to keep diskless.enable=false
+ LogConfig.validate(disklessAlreadyDisabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled)
+ }
+
@Test
def testValidDisklessAndRemoteStorageEnable(): Unit = {
diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java
index 2e53274549f..60db3015d4b 100644
--- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java
+++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java
@@ -131,6 +131,12 @@ public class ServerConfigs {
public static final String DISKLESS_STORAGE_SYSTEM_ENABLE_DOC = "Enable the diskless storage system. " +
"This enables diskless topics alongside classic topics.";
+ public static final String DISKLESS_MIGRATION_ENABLE_CONFIG = "diskless.migration.enable";
+ public static final boolean DISKLESS_MIGRATION_ENABLE_DEFAULT = false;
+ public static final String DISKLESS_MIGRATION_ENABLE_DOC = "Allow migrating existing topics from classic (diskless.enable=false) to diskless (diskless.enable=true). " +
+ "This should only be enabled in non-production environments for testing or migration purposes. " +
+ "When enabled, topics can have their diskless.enable config changed from false to true.";
+
/************* Authorizer Configuration ***********/
public static final String AUTHORIZER_CLASS_NAME_CONFIG = "authorizer.class.name";
@@ -178,6 +184,8 @@ public class ServerConfigs {
/** Diskless Configurations **/
.define(DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, BOOLEAN, DISKLESS_STORAGE_SYSTEM_ENABLE_DEFAULT, HIGH,
DISKLESS_STORAGE_SYSTEM_ENABLE_DOC)
+ .define(DISKLESS_MIGRATION_ENABLE_CONFIG, BOOLEAN, DISKLESS_MIGRATION_ENABLE_DEFAULT, LOW,
+ DISKLESS_MIGRATION_ENABLE_DOC)
/** Internal Configurations **/
// This indicates whether unreleased APIs should be advertised by this node.
.defineInternal(UNSTABLE_API_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH)
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index 838ab49929e..56502942e67 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -506,16 +506,20 @@ public static void validateBrokerLogConfigValues(Map, ?> props,
private static void validateDiskless(Map existingConfigs,
Map, ?> newConfigs,
- boolean isRemoteLogStorageEnabled) {
+ boolean isRemoteLogStorageEnabled,
+ boolean isDisklessMigrationEnabled) {
Optional wasDiskless = Optional.ofNullable(existingConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)).map(Boolean::parseBoolean);
Optional.ofNullable((Boolean) newConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG))
.ifPresent(isBeingEnabled -> {
if (isBeingEnabled) {
- // diskless.enable=true -> diskless.enable must be already set to true
+ // diskless.enable=true -> diskless.enable must be already set to true (unless migration is enabled)
if (wasDiskless.isPresent() && !wasDiskless.get()) {
- // cannot change from diskless.enable = false to diskless.enable = true
- throw new InvalidConfigurationException("It is invalid to enable diskless");
+ // cannot change from diskless.enable = false to diskless.enable = true (unless migration is enabled)
+ if (!isDisklessMigrationEnabled) {
+ throw new InvalidConfigurationException("It is invalid to enable diskless");
+ }
+ // Migration is enabled, allow the change from false to true
}
if (isRemoteLogStorageEnabled) {
@@ -538,10 +542,12 @@ private static void validateDiskless(Map existingConfigs,
* @param existingConfigs The existing properties
* @param newConfigs The new properties to be validated
* @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled
+ * @param isDisklessMigrationEnabled true if diskless migration is enabled (allows switching diskless.enable from false to true)
*/
private static void validateTopicLogConfigValues(Map existingConfigs,
Map, ?> newConfigs,
- boolean isRemoteLogStorageSystemEnabled) {
+ boolean isRemoteLogStorageSystemEnabled,
+ boolean isDisklessMigrationEnabled) {
validateValues(newConfigs);
boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
@@ -557,7 +563,7 @@ private static void validateTopicLogConfigValues(Map existingCon
validateTurningOffRemoteStorageWithDelete(newConfigs, wasRemoteLogEnabled, isRemoteLogStorageEnabled);
}
- validateDiskless(existingConfigs, newConfigs, isRemoteLogStorageEnabled);
+ validateDiskless(existingConfigs, newConfigs, isRemoteLogStorageEnabled, isDisklessMigrationEnabled);
}
public static void validateTurningOffRemoteStorageWithDelete(Map, ?> newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) {
@@ -646,13 +652,21 @@ private static void validateRemoteStorageRetentionTime(Map, ?> props) {
* Check that the given properties contain only valid log config names and that all values can be parsed and are valid
*/
public static void validate(Properties props) {
- validate(Map.of(), props, Map.of(), false);
+ validate(Map.of(), props, Map.of(), false, false);
}
public static void validate(Map existingConfigs,
Properties props,
Map, ?> configuredProps,
boolean isRemoteLogStorageSystemEnabled) {
+ validate(existingConfigs, props, configuredProps, isRemoteLogStorageSystemEnabled, false);
+ }
+
+ public static void validate(Map existingConfigs,
+ Properties props,
+ Map, ?> configuredProps,
+ boolean isRemoteLogStorageSystemEnabled,
+ boolean isDisklessMigrationEnabled) {
validateNames(props);
if (configuredProps == null || configuredProps.isEmpty()) {
Map, ?> valueMaps = CONFIG.parse(props);
@@ -661,7 +675,7 @@ public static void validate(Map existingConfigs,
Map