From 995b5fa817c4031178c3ae6583f9a3dc7ba5b13a Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Mon, 19 Jan 2026 15:47:35 +0100 Subject: [PATCH 1/5] Allow switching diskless.enable from false to true `diskless.enable` for a topic can be switched from false to true, only if `diskless.migration.enable` is enabled. --- .../ControllerConfigurationValidator.scala | 3 +- .../main/scala/kafka/server/KafkaConfig.scala | 1 + .../java/kafka/server/InklessConfigsTest.java | 34 ++++++++++++++++++- .../scala/unit/kafka/log/LogConfigTest.scala | 31 +++++++++++++++++ .../kafka/server/config/ServerConfigs.java | 8 +++++ .../storage/internals/log/LogConfig.java | 30 +++++++++++----- 6 files changed, 97 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index f163a2739ae..d5e944685da 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -118,7 +118,8 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu nullTopicConfigs.mkString(",")) } LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap, - kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), + kafkaConfig.disklessMigrationEnabled) case BROKER => validateBrokerName(resource.name()) case CLIENT_METRICS => val properties = new Properties() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 0cec30c06a0..81e91aa7316 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -428,6 +428,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) /** Diskless Configuration */ val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG) + val disklessMigrationEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_MIGRATION_ENABLE_CONFIG) def addReconfigurable(reconfigurable: Reconfigurable): Unit = { dynamicConfig.addReconfigurable(reconfigurable) diff --git a/core/src/test/java/kafka/server/InklessConfigsTest.java b/core/src/test/java/kafka/server/InklessConfigsTest.java index 21548e98fdc..5c87cf4912f 100644 --- a/core/src/test/java/kafka/server/InklessConfigsTest.java +++ b/core/src/test/java/kafka/server/InklessConfigsTest.java @@ -68,7 +68,11 @@ public class InklessConfigsTest { @Container protected static MinioContainer s3Container = S3TestContainer.minio(); - private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig) throws Exception { + private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig) throws Exception { + return init(defaultDisklessEnableConfig, disklessStorageEnableConfig, false); + } + + private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig, boolean disklessMigrationEnableConfig) throws Exception { final TestKitNodes nodes = new TestKitNodes.Builder() .setCombined(true) .setNumBrokerNodes(1) @@ -78,6 +82,7 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di .setConfigProp(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") .setConfigProp(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, String.valueOf(defaultDisklessEnableConfig)) .setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, String.valueOf(disklessStorageEnableConfig)) + .setConfigProp(ServerConfigs.DISKLESS_MIGRATION_ENABLE_CONFIG, String.valueOf(disklessMigrationEnableConfig)) // PG control plane config .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_CLASS_CONFIG, PostgresControlPlane.class.getName()) .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.CONNECTION_STRING_CONFIG, pgContainer.getJdbcUrl()) @@ -186,6 +191,33 @@ public void classicTopicWithDisklessDefaultTrueConfigs() throws Exception { cluster.close(); } + @Test + public void disklessMigrationEnabled() throws Exception { + // Initialize cluster with diskless migration enabled + var cluster = init(false, true, true); + Map clientConfigs = new HashMap<>(); + clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + + try (Admin admin = AdminClient.create(clientConfigs)) { + // When creating a new topic with diskless.enable=false + final String classicTopic = "classicTopic"; + createTopic(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false")); + // Then diskless.enable is set to false in the topic config + var classicTopicConfig = getTopicConfig(admin, classicTopic); + assertEquals("false", classicTopicConfig.get(DISKLESS_ENABLE_CONFIG)); + + // When migration is enabled, it SHOULD be possible to turn on diskless after the topic is created + alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true")); + // Verify the config was updated + var updatedTopicConfig = getTopicConfig(admin, classicTopic); + assertEquals("true", updatedTopicConfig.get(DISKLESS_ENABLE_CONFIG)); + + // But it should still NOT be possible to turn off diskless after enabling it + assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false"))); + } + cluster.close(); + } + public void createTopic(Admin admin, String topic, Map configs) throws Exception { admin.createTopics(Collections.singletonList( diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index cbe7ffeed41..2a7a51ead74 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -478,6 +478,37 @@ class LogConfigTest { LogConfig.validate(disklessAlreadyDisabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false) } + @Test + def testDisklessMigrationEnabled(): Unit = { + val kafkaProps = TestUtils.createDummyBrokerConfig() + val kafkaConfig = KafkaConfig.fromProps(kafkaProps) + val isDisklessMigrationEnabled = true + + val disklessAlreadyEnabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") + val disklessAlreadyDisabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") + + val setDisklessTrue = new Properties() + setDisklessTrue.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") + val setDisklessFalse = new Properties() + setDisklessFalse.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") + + // When migration is enabled: + // 1. Should be possible to switch from diskless.enable=false to diskless.enable=true + LogConfig.validate(disklessAlreadyDisabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + + // 2. Should still NOT be possible to switch from diskless.enable=true to diskless.enable=false (even with migration enabled) + assertThrows( + classOf[InvalidConfigurationException], + () => LogConfig.validate(disklessAlreadyEnabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + ) + + // 3. Should still be possible to keep diskless.enable=true + LogConfig.validate(disklessAlreadyEnabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + + // 4. Should still be possible to keep diskless.enable=false + LogConfig.validate(disklessAlreadyDisabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + } + @Test def testValidDisklessAndRemoteStorageEnable(): Unit = { diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index 2e53274549f..60db3015d4b 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -131,6 +131,12 @@ public class ServerConfigs { public static final String DISKLESS_STORAGE_SYSTEM_ENABLE_DOC = "Enable the diskless storage system. " + "This enables diskless topics alongside classic topics."; + public static final String DISKLESS_MIGRATION_ENABLE_CONFIG = "diskless.migration.enable"; + public static final boolean DISKLESS_MIGRATION_ENABLE_DEFAULT = false; + public static final String DISKLESS_MIGRATION_ENABLE_DOC = "Allow migrating existing topics from classic (diskless.enable=false) to diskless (diskless.enable=true). " + + "This should only be enabled in non-production environments for testing or migration purposes. " + + "When enabled, topics can have their diskless.enable config changed from false to true."; + /************* Authorizer Configuration ***********/ public static final String AUTHORIZER_CLASS_NAME_CONFIG = "authorizer.class.name"; @@ -178,6 +184,8 @@ public class ServerConfigs { /** Diskless Configurations **/ .define(DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, BOOLEAN, DISKLESS_STORAGE_SYSTEM_ENABLE_DEFAULT, HIGH, DISKLESS_STORAGE_SYSTEM_ENABLE_DOC) + .define(DISKLESS_MIGRATION_ENABLE_CONFIG, BOOLEAN, DISKLESS_MIGRATION_ENABLE_DEFAULT, LOW, + DISKLESS_MIGRATION_ENABLE_DOC) /** Internal Configurations **/ // This indicates whether unreleased APIs should be advertised by this node. .defineInternal(UNSTABLE_API_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 838ab49929e..56502942e67 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -506,16 +506,20 @@ public static void validateBrokerLogConfigValues(Map props, private static void validateDiskless(Map existingConfigs, Map newConfigs, - boolean isRemoteLogStorageEnabled) { + boolean isRemoteLogStorageEnabled, + boolean isDisklessMigrationEnabled) { Optional wasDiskless = Optional.ofNullable(existingConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)).map(Boolean::parseBoolean); Optional.ofNullable((Boolean) newConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)) .ifPresent(isBeingEnabled -> { if (isBeingEnabled) { - // diskless.enable=true -> diskless.enable must be already set to true + // diskless.enable=true -> diskless.enable must be already set to true (unless migration is enabled) if (wasDiskless.isPresent() && !wasDiskless.get()) { - // cannot change from diskless.enable = false to diskless.enable = true - throw new InvalidConfigurationException("It is invalid to enable diskless"); + // cannot change from diskless.enable = false to diskless.enable = true (unless migration is enabled) + if (!isDisklessMigrationEnabled) { + throw new InvalidConfigurationException("It is invalid to enable diskless"); + } + // Migration is enabled, allow the change from false to true } if (isRemoteLogStorageEnabled) { @@ -538,10 +542,12 @@ private static void validateDiskless(Map existingConfigs, * @param existingConfigs The existing properties * @param newConfigs The new properties to be validated * @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled + * @param isDisklessMigrationEnabled true if diskless migration is enabled (allows switching diskless.enable from false to true) */ private static void validateTopicLogConfigValues(Map existingConfigs, Map newConfigs, - boolean isRemoteLogStorageSystemEnabled) { + boolean isRemoteLogStorageSystemEnabled, + boolean isDisklessMigrationEnabled) { validateValues(newConfigs); boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); @@ -557,7 +563,7 @@ private static void validateTopicLogConfigValues(Map existingCon validateTurningOffRemoteStorageWithDelete(newConfigs, wasRemoteLogEnabled, isRemoteLogStorageEnabled); } - validateDiskless(existingConfigs, newConfigs, isRemoteLogStorageEnabled); + validateDiskless(existingConfigs, newConfigs, isRemoteLogStorageEnabled, isDisklessMigrationEnabled); } public static void validateTurningOffRemoteStorageWithDelete(Map newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) { @@ -646,13 +652,21 @@ private static void validateRemoteStorageRetentionTime(Map props) { * Check that the given properties contain only valid log config names and that all values can be parsed and are valid */ public static void validate(Properties props) { - validate(Map.of(), props, Map.of(), false); + validate(Map.of(), props, Map.of(), false, false); } public static void validate(Map existingConfigs, Properties props, Map configuredProps, boolean isRemoteLogStorageSystemEnabled) { + validate(existingConfigs, props, configuredProps, isRemoteLogStorageSystemEnabled, false); + } + + public static void validate(Map existingConfigs, + Properties props, + Map configuredProps, + boolean isRemoteLogStorageSystemEnabled, + boolean isDisklessMigrationEnabled) { validateNames(props); if (configuredProps == null || configuredProps.isEmpty()) { Map valueMaps = CONFIG.parse(props); @@ -661,7 +675,7 @@ public static void validate(Map existingConfigs, Map combinedConfigs = new HashMap<>(configuredProps); combinedConfigs.putAll(props); Map valueMaps = CONFIG.parse(combinedConfigs); - validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled); + validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled, isDisklessMigrationEnabled); } } From 095d0ea425f20b601202cfb3fe8a6205a13e49f6 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Wed, 21 Jan 2026 18:10:46 +0100 Subject: [PATCH 2/5] fixup! Allow migration only for tiered classic topics and rename to `diskless.allow.from.classic.enable` --- .../ControllerConfigurationValidator.scala | 2 +- .../main/scala/kafka/server/KafkaConfig.scala | 2 +- .../java/kafka/server/InklessConfigsTest.java | 59 +++++++++++---- .../scala/unit/kafka/log/LogConfigTest.scala | 75 +++++++++++++++---- .../kafka/server/config/ServerConfigs.java | 10 +-- .../storage/internals/log/LogConfig.java | 24 +++--- 6 files changed, 123 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index d5e944685da..07c544838bc 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -119,7 +119,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu } LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), - kafkaConfig.disklessMigrationEnabled) + kafkaConfig.disklessAllowFromClassicEnabled) case BROKER => validateBrokerName(resource.name()) case CLIENT_METRICS => val properties = new Properties() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 81e91aa7316..8478f2f0de6 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -428,7 +428,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) /** Diskless Configuration */ val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG) - val disklessMigrationEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_MIGRATION_ENABLE_CONFIG) + val disklessAllowFromClassicEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG) def addReconfigurable(reconfigurable: Reconfigurable): Unit = { dynamicConfig.addReconfigurable(reconfigurable) diff --git a/core/src/test/java/kafka/server/InklessConfigsTest.java b/core/src/test/java/kafka/server/InklessConfigsTest.java index 5c87cf4912f..f5153b79d56 100644 --- a/core/src/test/java/kafka/server/InklessConfigsTest.java +++ b/core/src/test/java/kafka/server/InklessConfigsTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -58,6 +59,7 @@ import io.aiven.inkless.test_utils.S3TestContainer; import static org.apache.kafka.common.config.TopicConfig.DISKLESS_ENABLE_CONFIG; +import static org.apache.kafka.common.config.TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -72,17 +74,20 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di return init(defaultDisklessEnableConfig, disklessStorageEnableConfig, false); } - private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig, boolean disklessMigrationEnableConfig) throws Exception { + private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig, boolean isDisklessAllowFromClassicEnabled) throws Exception { final TestKitNodes nodes = new TestKitNodes.Builder() .setCombined(true) .setNumBrokerNodes(1) .setNumControllerNodes(1) .build(); - var cluster = new KafkaClusterTestKit.Builder(nodes) + var builder = new KafkaClusterTestKit.Builder(nodes) .setConfigProp(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") .setConfigProp(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, String.valueOf(defaultDisklessEnableConfig)) .setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, String.valueOf(disklessStorageEnableConfig)) - .setConfigProp(ServerConfigs.DISKLESS_MIGRATION_ENABLE_CONFIG, String.valueOf(disklessMigrationEnableConfig)) + .setConfigProp(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, String.valueOf(isDisklessAllowFromClassicEnabled)) + .setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") + .setConfigProp(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, + "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") // PG control plane config .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_CLASS_CONFIG, PostgresControlPlane.class.getName()) .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.CONNECTION_STRING_CONFIG, pgContainer.getJdbcUrl()) @@ -96,8 +101,9 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_PATH_STYLE_ENABLED_CONFIG, "true") .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_ACCESS_KEY_ID_CONFIG, s3Container.getAccessKey()) .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey()) - .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey()) - .build(); + .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey()); + + var cluster = builder.build(); cluster.format(); cluster.startup(); cluster.waitForReadyBrokers(); @@ -193,31 +199,54 @@ public void classicTopicWithDisklessDefaultTrueConfigs() throws Exception { @Test public void disklessMigrationEnabled() throws Exception { - // Initialize cluster with diskless migration enabled var cluster = init(false, true, true); Map clientConfigs = new HashMap<>(); clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); try (Admin admin = AdminClient.create(clientConfigs)) { - // When creating a new topic with diskless.enable=false - final String classicTopic = "classicTopic"; - createTopic(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false")); + // When creating a new topic with diskless.enable=false AND remote.log.storage.enable=true + final String tieredTopic = "tieredTopic"; + createTopic(admin, tieredTopic, Map.of( + DISKLESS_ENABLE_CONFIG, "false", + REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true" + )); // Then diskless.enable is set to false in the topic config - var classicTopicConfig = getTopicConfig(admin, classicTopic); - assertEquals("false", classicTopicConfig.get(DISKLESS_ENABLE_CONFIG)); + var tieredTopicConfig = getTopicConfig(admin, tieredTopic); + assertEquals("false", tieredTopicConfig.get(DISKLESS_ENABLE_CONFIG)); + assertEquals("true", tieredTopicConfig.get("remote.storage.enable")); - // When migration is enabled, it SHOULD be possible to turn on diskless after the topic is created - alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true")); + // When migration is enabled AND remote storage is enabled, it SHOULD be possible to turn on diskless + alterTopicConfig(admin, tieredTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true")); // Verify the config was updated - var updatedTopicConfig = getTopicConfig(admin, classicTopic); + var updatedTopicConfig = getTopicConfig(admin, tieredTopic); assertEquals("true", updatedTopicConfig.get(DISKLESS_ENABLE_CONFIG)); // But it should still NOT be possible to turn off diskless after enabling it - assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false"))); + assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, tieredTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false"))); } cluster.close(); } + @Test + public void disklessMigrationRequiresRemoteStorage() throws Exception { + var cluster = init(false, true, true); + Map clientConfigs = new HashMap<>(); + clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + + try (Admin admin = AdminClient.create(clientConfigs)) { + // When creating a new topic with diskless.enable=false WITHOUT remote storage + final String classicTopic = "classicTopic"; + createTopic(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false")); + // Then diskless.enable is set to false in the topic config + var classicTopicConfig = getTopicConfig(admin, classicTopic); + assertEquals("false", classicTopicConfig.get(DISKLESS_ENABLE_CONFIG)); + + // Even with migration enabled, it should NOT be possible to turn on diskless + // because remote storage is not enabled on this topic + assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true"))); + } + cluster.close(); + } public void createTopic(Admin admin, String topic, Map configs) throws Exception { admin.createTopics(Collections.singletonList( diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 2a7a51ead74..02e7f15496f 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -482,33 +482,80 @@ class LogConfigTest { def testDisklessMigrationEnabled(): Unit = { val kafkaProps = TestUtils.createDummyBrokerConfig() val kafkaConfig = KafkaConfig.fromProps(kafkaProps) - val isDisklessMigrationEnabled = true + val isDisklessAllowFromClassicEnabled = true + val isRemoteStorageSystemEnabled = true - val disklessAlreadyEnabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") - val disklessAlreadyDisabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") + // Tiered storage topic (remote storage enabled at topic level) - migration candidate + val tieredTopicWithDisklessDisabled = util.Map.of( + TopicConfig.DISKLESS_ENABLE_CONFIG, "false", + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true" + ) + + // Diskless topic (no remote storage) + val disklessTopic = util.Map.of( + TopicConfig.DISKLESS_ENABLE_CONFIG, "true" + ) + + val migrateToDiskless = new Properties() + migrateToDiskless.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") + migrateToDiskless.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - val setDisklessTrue = new Properties() - setDisklessTrue.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") val setDisklessFalse = new Properties() setDisklessFalse.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") - // When migration is enabled: - // 1. Should be possible to switch from diskless.enable=false to diskless.enable=true - LogConfig.validate(disklessAlreadyDisabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + // 1. Should be possible to switch from diskless.enable=false to diskless.enable=true for a tiered topic + LogConfig.validate(tieredTopicWithDisklessDisabled, migrateToDiskless, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled) // 2. Should still NOT be possible to switch from diskless.enable=true to diskless.enable=false (even with migration enabled) assertThrows( classOf[InvalidConfigurationException], - () => LogConfig.validate(disklessAlreadyEnabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + () => LogConfig.validate(disklessTopic, setDisklessFalse, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled) ) + } - // 3. Should still be possible to keep diskless.enable=true - LogConfig.validate(disklessAlreadyEnabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) + @Test + def testDisklessMigrationRequiresBothMigrationAndRemoteStorage(): Unit = { + val kafkaProps = TestUtils.createDummyBrokerConfig() + val kafkaConfig = KafkaConfig.fromProps(kafkaProps) - // 4. Should still be possible to keep diskless.enable=false - LogConfig.validate(disklessAlreadyDisabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false, isDisklessMigrationEnabled) - } + val disklessAlreadyDisabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") + val setDisklessTrue = new Properties() + setDisklessTrue.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") + + // Case 1: Migration enabled but remote storage NOT enabled + assertThrows( + classOf[InvalidConfigurationException], + () => LogConfig.validate( + disklessAlreadyDisabled, + setDisklessTrue, + kafkaConfig.extractLogConfigMap, + false, // isRemoteLogStorageSystemEnabled + true) // isDisklessAllowFromClassicEnabled + ) + + // Case 2: Remote storage enabled but migration NOT enabled + assertThrows( + classOf[InvalidConfigurationException], + () => LogConfig.validate( + disklessAlreadyDisabled, + setDisklessTrue, + kafkaConfig.extractLogConfigMap, + true, // isRemoteLogStorageSystemEnabled + false) // isDisklessAllowFromClassicEnabled + ) + + // Case 3: Neither migration nor remote storage enabled + assertThrows( + classOf[InvalidConfigurationException], + () => LogConfig.validate( + disklessAlreadyDisabled, + setDisklessTrue, + kafkaConfig.extractLogConfigMap, + false, // isRemoteLogStorageSystemEnabled + false) // isDisklessAllowFromClassicEnabled + ) + } @Test def testValidDisklessAndRemoteStorageEnable(): Unit = { diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index 60db3015d4b..ebf173d457e 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -131,9 +131,9 @@ public class ServerConfigs { public static final String DISKLESS_STORAGE_SYSTEM_ENABLE_DOC = "Enable the diskless storage system. " + "This enables diskless topics alongside classic topics."; - public static final String DISKLESS_MIGRATION_ENABLE_CONFIG = "diskless.migration.enable"; - public static final boolean DISKLESS_MIGRATION_ENABLE_DEFAULT = false; - public static final String DISKLESS_MIGRATION_ENABLE_DOC = "Allow migrating existing topics from classic (diskless.enable=false) to diskless (diskless.enable=true). " + + public static final String DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG = "diskless.allow.from.classic.enable"; + public static final boolean DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DEFAULT = false; + public static final String DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DOC = "Allow migrating existing topics with remote.storage.enable=true from classic (diskless.enable=false) to diskless (diskless.enable=true). " + "This should only be enabled in non-production environments for testing or migration purposes. " + "When enabled, topics can have their diskless.enable config changed from false to true."; @@ -184,8 +184,8 @@ public class ServerConfigs { /** Diskless Configurations **/ .define(DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, BOOLEAN, DISKLESS_STORAGE_SYSTEM_ENABLE_DEFAULT, HIGH, DISKLESS_STORAGE_SYSTEM_ENABLE_DOC) - .define(DISKLESS_MIGRATION_ENABLE_CONFIG, BOOLEAN, DISKLESS_MIGRATION_ENABLE_DEFAULT, LOW, - DISKLESS_MIGRATION_ENABLE_DOC) + .define(DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, BOOLEAN, DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DEFAULT, LOW, + DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DOC) /** Internal Configurations **/ // This indicates whether unreleased APIs should be advertised by this node. .defineInternal(UNSTABLE_API_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 56502942e67..d3d92296ed0 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -507,22 +507,20 @@ public static void validateBrokerLogConfigValues(Map props, private static void validateDiskless(Map existingConfigs, Map newConfigs, boolean isRemoteLogStorageEnabled, - boolean isDisklessMigrationEnabled) { + boolean isDisklessAllowFromClassicEnabled) { Optional wasDiskless = Optional.ofNullable(existingConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)).map(Boolean::parseBoolean); Optional.ofNullable((Boolean) newConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)) .ifPresent(isBeingEnabled -> { if (isBeingEnabled) { - // diskless.enable=true -> diskless.enable must be already set to true (unless migration is enabled) + // diskless.enable=true -> diskless.enable must be already set to true (unless migration is enabled and remote storage is enabled) if (wasDiskless.isPresent() && !wasDiskless.get()) { - // cannot change from diskless.enable = false to diskless.enable = true (unless migration is enabled) - if (!isDisklessMigrationEnabled) { + // cannot change from diskless.enable = false to diskless.enable = true (unless migration is enabled and remote storage is enabled) + if (!isDisklessAllowFromClassicEnabled || !isRemoteLogStorageEnabled) { throw new InvalidConfigurationException("It is invalid to enable diskless"); } - // Migration is enabled, allow the change from false to true - } - - if (isRemoteLogStorageEnabled) { + // Migration is enabled and remote storage is enabled, allow the change from false to true + } else if (isRemoteLogStorageEnabled) { throw new InvalidConfigurationException("Diskless and remote storage cannot be enabled simultaneously"); } } else { @@ -542,12 +540,12 @@ private static void validateDiskless(Map existingConfigs, * @param existingConfigs The existing properties * @param newConfigs The new properties to be validated * @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled - * @param isDisklessMigrationEnabled true if diskless migration is enabled (allows switching diskless.enable from false to true) + * @param isDisklessAllowFromClassicEnabled true if diskless migration is enabled (allows switching diskless.enable from false to true) */ private static void validateTopicLogConfigValues(Map existingConfigs, Map newConfigs, boolean isRemoteLogStorageSystemEnabled, - boolean isDisklessMigrationEnabled) { + boolean isDisklessAllowFromClassicEnabled) { validateValues(newConfigs); boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); @@ -563,7 +561,7 @@ private static void validateTopicLogConfigValues(Map existingCon validateTurningOffRemoteStorageWithDelete(newConfigs, wasRemoteLogEnabled, isRemoteLogStorageEnabled); } - validateDiskless(existingConfigs, newConfigs, isRemoteLogStorageEnabled, isDisklessMigrationEnabled); + validateDiskless(existingConfigs, newConfigs, isRemoteLogStorageEnabled, isDisklessAllowFromClassicEnabled); } public static void validateTurningOffRemoteStorageWithDelete(Map newConfigs, boolean wasRemoteLogEnabled, boolean isRemoteLogStorageEnabled) { @@ -666,7 +664,7 @@ public static void validate(Map existingConfigs, Properties props, Map configuredProps, boolean isRemoteLogStorageSystemEnabled, - boolean isDisklessMigrationEnabled) { + boolean isDisklessAllowFromClassicEnabled) { validateNames(props); if (configuredProps == null || configuredProps.isEmpty()) { Map valueMaps = CONFIG.parse(props); @@ -675,7 +673,7 @@ public static void validate(Map existingConfigs, Map combinedConfigs = new HashMap<>(configuredProps); combinedConfigs.putAll(props); Map valueMaps = CONFIG.parse(combinedConfigs); - validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled, isDisklessMigrationEnabled); + validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled, isDisklessAllowFromClassicEnabled); } } From b3e4ee380b75da0bd4be17e75cb4329befaa9612 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Thu, 22 Jan 2026 16:02:02 +0100 Subject: [PATCH 3/5] fixup! Clean logic and handle cases where diskless.enable is not set --- .../java/kafka/server/InklessConfigsTest.java | 9 +-- .../scala/unit/kafka/log/LogConfigTest.scala | 63 ++++++++++++------- .../storage/internals/log/LogConfig.java | 33 +++++++--- 3 files changed, 67 insertions(+), 38 deletions(-) diff --git a/core/src/test/java/kafka/server/InklessConfigsTest.java b/core/src/test/java/kafka/server/InklessConfigsTest.java index f5153b79d56..1fe85c84b62 100644 --- a/core/src/test/java/kafka/server/InklessConfigsTest.java +++ b/core/src/test/java/kafka/server/InklessConfigsTest.java @@ -80,14 +80,13 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di .setNumBrokerNodes(1) .setNumControllerNodes(1) .build(); - var builder = new KafkaClusterTestKit.Builder(nodes) + var cluster = new KafkaClusterTestKit.Builder(nodes) .setConfigProp(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1") .setConfigProp(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, String.valueOf(defaultDisklessEnableConfig)) .setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, String.valueOf(disklessStorageEnableConfig)) .setConfigProp(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, String.valueOf(isDisklessAllowFromClassicEnabled)) .setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") - .setConfigProp(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, - "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") + .setConfigProp(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager") // PG control plane config .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_CLASS_CONFIG, PostgresControlPlane.class.getName()) .setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.CONNECTION_STRING_CONFIG, pgContainer.getJdbcUrl()) @@ -101,9 +100,7 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_PATH_STYLE_ENABLED_CONFIG, "true") .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_ACCESS_KEY_ID_CONFIG, s3Container.getAccessKey()) .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey()) - .setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey()); - - var cluster = builder.build(); + .build(); cluster.format(); cluster.startup(); cluster.waitForReadyBrokers(); diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 02e7f15496f..5124a2c678e 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -452,65 +452,82 @@ class LogConfigTest { val disklessAlreadyEnabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") val disklessAlreadyDisabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") - // 2 val setDisklessTrue = new Properties() setDisklessTrue.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") - // 5 val setDisklessFalse = new Properties() setDisklessFalse.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") - // 2. Given diskless.enable=true: - // 2.2 should be possible to set diskless.enable to true + // Given diskless.enable=true: + // Should be possible to set diskless.enable to true LogConfig.validate(disklessAlreadyEnabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false) - // 2.5 Should NOT be possible to set diskless.enable to false + // Should NOT be possible to set diskless.enable to false assertThrows( classOf[InvalidConfigurationException], () => LogConfig.validate(disklessAlreadyEnabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false) ) - // 5. Given diskless.enable=false: - // 5.2 should NOT be possible to set diskless.enable to true + // Given diskless.enable=false: + // Should NOT be possible to set diskless.enable to true assertThrows( classOf[InvalidConfigurationException], () => LogConfig.validate(disklessAlreadyDisabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false) ) - // 5.5 Should be possible to set diskless.enable to false + // Should be possible to set diskless.enable to false LogConfig.validate(disklessAlreadyDisabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false) + + // Given existing topic without diskless.enable set (e.g., created before diskless was introduced): + // Should NOT be possible to set diskless.enable to true + val topicWithoutDisklessConfig = util.Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000") + assertThrows( + classOf[InvalidConfigurationException], + () => LogConfig.validate(topicWithoutDisklessConfig, setDisklessTrue, kafkaConfig.extractLogConfigMap, false) + ) } @Test def testDisklessMigrationEnabled(): Unit = { val kafkaProps = TestUtils.createDummyBrokerConfig() val kafkaConfig = KafkaConfig.fromProps(kafkaProps) - val isDisklessAllowFromClassicEnabled = true val isRemoteStorageSystemEnabled = true + // Given that migrating from classic to diskless is enabled + val isDisklessAllowFromClassicEnabled = true - // Tiered storage topic (remote storage enabled at topic level) - migration candidate + // 1. Should be possible to switch from diskless.enable=false to diskless.enable=true for a tiered topic val tieredTopicWithDisklessDisabled = util.Map.of( TopicConfig.DISKLESS_ENABLE_CONFIG, "false", TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true" ) - - // Diskless topic (no remote storage) - val disklessTopic = util.Map.of( - TopicConfig.DISKLESS_ENABLE_CONFIG, "true" - ) - val migrateToDiskless = new Properties() migrateToDiskless.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") migrateToDiskless.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - - val setDisklessFalse = new Properties() - setDisklessFalse.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") - - // 1. Should be possible to switch from diskless.enable=false to diskless.enable=true for a tiered topic LogConfig.validate(tieredTopicWithDisklessDisabled, migrateToDiskless, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled) + // 1.1 Should be possible to switch from no diskless config to diskless.enable=true for a tiered topic + val tieredTopicWithoutDisklessConfig = util.Map.of( + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true" + ) + LogConfig.validate(tieredTopicWithoutDisklessConfig, migrateToDiskless, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled) + // 2. Should still NOT be possible to switch from diskless.enable=true to diskless.enable=false (even with migration enabled) + val disklessTopic = util.Map.of( + TopicConfig.DISKLESS_ENABLE_CONFIG, "true" + ) + val setDisklessFalse = new Properties() + setDisklessFalse.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "false") assertThrows( classOf[InvalidConfigurationException], () => LogConfig.validate(disklessTopic, setDisklessFalse, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled) ) + + // 3. After migration (diskless=true with remote.storage=true), should still be able to alter other configs + val migratedTopic = util.Map.of( + TopicConfig.DISKLESS_ENABLE_CONFIG, "true", + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true" + ) + val keepMigratedState = new Properties() + keepMigratedState.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") + keepMigratedState.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + LogConfig.validate(migratedTopic, keepMigratedState, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled) } @Test @@ -566,11 +583,11 @@ class LogConfigTest { logProps.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true") logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - // Add diskless + // Add diskless to existing TS topic (diskless not previously set) - treated same as diskless=false val t1 = assertThrows( classOf[InvalidConfigurationException], () => LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), logProps, kafkaConfig.extractLogConfigMap, true)) - assertEquals("Diskless and remote storage cannot be enabled simultaneously", t1.getMessage) + assertEquals("It is invalid to enable diskless", t1.getMessage) // Add remote storage val t2 = assertThrows( diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index d3d92296ed0..fc0b127ff43 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -508,25 +508,40 @@ private static void validateDiskless(Map existingConfigs, Map newConfigs, boolean isRemoteLogStorageEnabled, boolean isDisklessAllowFromClassicEnabled) { - Optional wasDiskless = Optional.ofNullable(existingConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)).map(Boolean::parseBoolean); + boolean isCreation = existingConfigs.isEmpty(); + Optional wasDiskless = Optional.ofNullable(existingConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)) + .map(Boolean::parseBoolean); + boolean wasRemoteStorageEnabled = Boolean.parseBoolean( + existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); Optional.ofNullable((Boolean) newConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)) .ifPresent(isBeingEnabled -> { if (isBeingEnabled) { - // diskless.enable=true -> diskless.enable must be already set to true (unless migration is enabled and remote storage is enabled) - if (wasDiskless.isPresent() && !wasDiskless.get()) { - // cannot change from diskless.enable = false to diskless.enable = true (unless migration is enabled and remote storage is enabled) + if (isCreation) { + // Creation: diskless + TS not allowed + if (isRemoteLogStorageEnabled) { + throw new InvalidConfigurationException("Diskless and remote storage cannot be enabled simultaneously"); + } + } else if (wasDiskless.isPresent() && !wasDiskless.get()) { + // Update from explicitly false → true: only allowed with migration flag + TS + if (!isDisklessAllowFromClassicEnabled || !isRemoteLogStorageEnabled) { + throw new InvalidConfigurationException("It is invalid to enable diskless"); + } + } else if (wasDiskless.isPresent() && wasDiskless.get()) { + // Keeping diskless=true: block if TS is being added + if (isRemoteLogStorageEnabled && !wasRemoteStorageEnabled) { + throw new InvalidConfigurationException("Diskless and remote storage cannot be enabled simultaneously"); + } + } else { + // wasDiskless not present on existing topic, treat same as explicitly false + // Only allowed with migration flag + TS if (!isDisklessAllowFromClassicEnabled || !isRemoteLogStorageEnabled) { throw new InvalidConfigurationException("It is invalid to enable diskless"); } - // Migration is enabled and remote storage is enabled, allow the change from false to true - } else if (isRemoteLogStorageEnabled) { - throw new InvalidConfigurationException("Diskless and remote storage cannot be enabled simultaneously"); } } else { - // diskless.enable = false -> diskless.enable must be not set or set to false + // Cannot disable diskless once enabled if (wasDiskless.isPresent() && wasDiskless.get()) { - // cannot change from diskless.enable = true to diskless.enable = false throw new InvalidConfigurationException("It is invalid to disable diskless"); } } From a18df7510140c073d233c30982268a7bfa4a060b Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Fri, 23 Jan 2026 14:53:14 +0100 Subject: [PATCH 4/5] fixup! Refactor and fix exception messages --- .../scala/unit/kafka/log/LogConfigTest.scala | 10 ++--- .../storage/internals/log/LogConfig.java | 37 +++++++++---------- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 5124a2c678e..e2dbab7b1f9 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -575,7 +575,7 @@ class LogConfigTest { } @Test - def testValidDisklessAndRemoteStorageEnable(): Unit = { + def testInvalidDisklessAndRemoteStorageEnable(): Unit = { val kafkaProps = TestUtils.createDummyBrokerConfig() val kafkaConfig = KafkaConfig.fromProps(kafkaProps) @@ -587,19 +587,19 @@ class LogConfigTest { val t1 = assertThrows( classOf[InvalidConfigurationException], () => LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), logProps, kafkaConfig.extractLogConfigMap, true)) - assertEquals("It is invalid to enable diskless", t1.getMessage) + assertEquals("To migrate a classic topic to diskless, both diskless.enable and remote.storage.enable must be set to true, and the broker config log.diskless.enable must also be enabled.", t1.getMessage) // Add remote storage val t2 = assertThrows( classOf[InvalidConfigurationException], () => LogConfig.validate(util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true"), logProps, kafkaConfig.extractLogConfigMap, true)) - assertEquals("Diskless and remote storage cannot be enabled simultaneously", t2.getMessage) + assertEquals("It is invalid to enable remote storage on an existing diskless topic.", t2.getMessage) - // Add both + // Create a diskless topic with remote storage enabled is invalid val t3 = assertThrows( classOf[InvalidConfigurationException], () => LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap, true)) - assertEquals("Diskless and remote storage cannot be enabled simultaneously", t3.getMessage) + assertEquals("It is invalid to create a diskless topic with remote storage enabled.", t3.getMessage) } @Test diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index fc0b127ff43..dd35f8e87c1 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -509,40 +509,39 @@ private static void validateDiskless(Map existingConfigs, boolean isRemoteLogStorageEnabled, boolean isDisklessAllowFromClassicEnabled) { boolean isCreation = existingConfigs.isEmpty(); - Optional wasDiskless = Optional.ofNullable(existingConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)) - .map(Boolean::parseBoolean); + boolean wasDisklessEnabled = Boolean.parseBoolean(existingConfigs.getOrDefault(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")); boolean wasRemoteStorageEnabled = Boolean.parseBoolean( existingConfigs.getOrDefault(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")); Optional.ofNullable((Boolean) newConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG)) - .ifPresent(isBeingEnabled -> { - if (isBeingEnabled) { + .ifPresent(disklessIsBeingEnabled -> { + if (disklessIsBeingEnabled) { + // Enabling or keeping diskless enabled if (isCreation) { - // Creation: diskless + TS not allowed + // Creation: diskless + remote storage not allowed if (isRemoteLogStorageEnabled) { - throw new InvalidConfigurationException("Diskless and remote storage cannot be enabled simultaneously"); + throw new InvalidConfigurationException( + "It is invalid to create a diskless topic with remote storage enabled."); } - } else if (wasDiskless.isPresent() && !wasDiskless.get()) { - // Update from explicitly false → true: only allowed with migration flag + TS - if (!isDisklessAllowFromClassicEnabled || !isRemoteLogStorageEnabled) { - throw new InvalidConfigurationException("It is invalid to enable diskless"); - } - } else if (wasDiskless.isPresent() && wasDiskless.get()) { - // Keeping diskless=true: block if TS is being added + } else if (wasDisklessEnabled) { + // Diskless already enabled: block adding remote storage to existing diskless topic if (isRemoteLogStorageEnabled && !wasRemoteStorageEnabled) { - throw new InvalidConfigurationException("Diskless and remote storage cannot be enabled simultaneously"); + throw new InvalidConfigurationException( + "It is invalid to enable remote storage on an existing diskless topic."); } } else { - // wasDiskless not present on existing topic, treat same as explicitly false - // Only allowed with migration flag + TS + // Was not diskless (false or not set): migration requires both flags if (!isDisklessAllowFromClassicEnabled || !isRemoteLogStorageEnabled) { - throw new InvalidConfigurationException("It is invalid to enable diskless"); + throw new InvalidConfigurationException("To migrate a classic topic to diskless, both " + + TopicConfig.DISKLESS_ENABLE_CONFIG + " and " + + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG + " must be set to true, and the broker config " + + ServerLogConfigs.DISKLESS_ENABLE_CONFIG + " must also be enabled."); } } } else { // Cannot disable diskless once enabled - if (wasDiskless.isPresent() && wasDiskless.get()) { - throw new InvalidConfigurationException("It is invalid to disable diskless"); + if (wasDisklessEnabled) { + throw new InvalidConfigurationException("It is invalid to disable diskless."); } } }); From 0fc34b671cc5fe2bf533ca6bc9fc4fd1e059d96d Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Mon, 26 Jan 2026 10:46:33 +0100 Subject: [PATCH 5/5] fixup! Fix error message --- core/src/test/scala/unit/kafka/log/LogConfigTest.scala | 2 +- .../java/org/apache/kafka/storage/internals/log/LogConfig.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index e2dbab7b1f9..280d13d9be5 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -587,7 +587,7 @@ class LogConfigTest { val t1 = assertThrows( classOf[InvalidConfigurationException], () => LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), logProps, kafkaConfig.extractLogConfigMap, true)) - assertEquals("To migrate a classic topic to diskless, both diskless.enable and remote.storage.enable must be set to true, and the broker config log.diskless.enable must also be enabled.", t1.getMessage) + assertEquals("To migrate a classic topic to diskless, both diskless.enable and remote.storage.enable must be set to true, and the broker config diskless.allow.from.classic.enable must also be enabled.", t1.getMessage) // Add remote storage val t2 = assertThrows( diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index dd35f8e87c1..ae4ef4b32ab 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.ConfigUtils; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.config.QuotaConfig; +import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.config.ServerTopicConfigSynonyms; import org.apache.kafka.server.record.BrokerCompressionType; @@ -535,7 +536,7 @@ private static void validateDiskless(Map existingConfigs, throw new InvalidConfigurationException("To migrate a classic topic to diskless, both " + TopicConfig.DISKLESS_ENABLE_CONFIG + " and " + TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG + " must be set to true, and the broker config " - + ServerLogConfigs.DISKLESS_ENABLE_CONFIG + " must also be enabled."); + + ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG + " must also be enabled."); } } } else {