Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
nullTopicConfigs.mkString(","))
}
LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
kafkaConfig.disklessAllowFromClassicEnabled)
case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
val properties = new Properties()
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])

/** Diskless Configuration */
val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG)
val disklessAllowFromClassicEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG)

def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.addReconfigurable(reconfigurable)
Expand Down
62 changes: 60 additions & 2 deletions core/src/test/java/kafka/server/InklessConfigsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -58,6 +59,7 @@
import io.aiven.inkless.test_utils.S3TestContainer;

import static org.apache.kafka.common.config.TopicConfig.DISKLESS_ENABLE_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

Expand All @@ -68,7 +70,11 @@ public class InklessConfigsTest {
@Container
protected static MinioContainer s3Container = S3TestContainer.minio();

private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig) throws Exception {
private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig) throws Exception {
return init(defaultDisklessEnableConfig, disklessStorageEnableConfig, false);
}

private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig, boolean isDisklessAllowFromClassicEnabled) throws Exception {
final TestKitNodes nodes = new TestKitNodes.Builder()
.setCombined(true)
.setNumBrokerNodes(1)
Expand All @@ -78,6 +84,9 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di
.setConfigProp(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
.setConfigProp(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, String.valueOf(defaultDisklessEnableConfig))
.setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, String.valueOf(disklessStorageEnableConfig))
.setConfigProp(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, String.valueOf(isDisklessAllowFromClassicEnabled))
.setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
.setConfigProp(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
// PG control plane config
.setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_CLASS_CONFIG, PostgresControlPlane.class.getName())
.setConfigProp(InklessConfig.PREFIX + InklessConfig.CONTROL_PLANE_PREFIX + PostgresControlPlaneConfig.CONNECTION_STRING_CONFIG, pgContainer.getJdbcUrl())
Expand All @@ -91,7 +100,6 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di
.setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.S3_PATH_STYLE_ENABLED_CONFIG, "true")
.setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_ACCESS_KEY_ID_CONFIG, s3Container.getAccessKey())
.setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey())
.setConfigProp(InklessConfig.PREFIX + InklessConfig.STORAGE_PREFIX + S3StorageConfig.AWS_SECRET_ACCESS_KEY_CONFIG, s3Container.getSecretKey())
.build();
cluster.format();
cluster.startup();
Expand Down Expand Up @@ -186,6 +194,56 @@ public void classicTopicWithDisklessDefaultTrueConfigs() throws Exception {
cluster.close();
}

@Test
public void disklessMigrationEnabled() throws Exception {
var cluster = init(false, true, true);
Map<String, Object> clientConfigs = new HashMap<>();
clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());

try (Admin admin = AdminClient.create(clientConfigs)) {
// When creating a new topic with diskless.enable=false AND remote.log.storage.enable=true
final String tieredTopic = "tieredTopic";
createTopic(admin, tieredTopic, Map.of(
DISKLESS_ENABLE_CONFIG, "false",
REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"
));
// Then diskless.enable is set to false in the topic config
var tieredTopicConfig = getTopicConfig(admin, tieredTopic);
assertEquals("false", tieredTopicConfig.get(DISKLESS_ENABLE_CONFIG));
assertEquals("true", tieredTopicConfig.get("remote.storage.enable"));

// When migration is enabled AND remote storage is enabled, it SHOULD be possible to turn on diskless
alterTopicConfig(admin, tieredTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true"));
// Verify the config was updated
var updatedTopicConfig = getTopicConfig(admin, tieredTopic);
assertEquals("true", updatedTopicConfig.get(DISKLESS_ENABLE_CONFIG));

// But it should still NOT be possible to turn off diskless after enabling it
assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, tieredTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false")));
}
cluster.close();
}

Comment thread
giuseppelillo marked this conversation as resolved.
@Test
public void disklessMigrationRequiresRemoteStorage() throws Exception {
var cluster = init(false, true, true);
Map<String, Object> clientConfigs = new HashMap<>();
clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());

try (Admin admin = AdminClient.create(clientConfigs)) {
// When creating a new topic with diskless.enable=false WITHOUT remote storage
final String classicTopic = "classicTopic";
createTopic(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "false"));
// Then diskless.enable is set to false in the topic config
var classicTopicConfig = getTopicConfig(admin, classicTopic);
assertEquals("false", classicTopicConfig.get(DISKLESS_ENABLE_CONFIG));

// Even with migration enabled, it should NOT be possible to turn on diskless
// because remote storage is not enabled on this topic
assertThrows(ExecutionException.class, () -> alterTopicConfig(admin, classicTopic, Map.of(DISKLESS_ENABLE_CONFIG, "true")));
}
cluster.close();
}

public void createTopic(Admin admin, String topic, Map<String, String> configs) throws Exception {
admin.createTopics(Collections.singletonList(
Expand Down
123 changes: 109 additions & 14 deletions core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -452,59 +452,154 @@ class LogConfigTest {
val disklessAlreadyEnabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
val disklessAlreadyDisabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")

// 2
val setDisklessTrue = new Properties()
setDisklessTrue.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
// 5
val setDisklessFalse = new Properties()
setDisklessFalse.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")

// 2. Given diskless.enable=true:
// 2.2 should be possible to set diskless.enable to true
// Given diskless.enable=true:
// Should be possible to set diskless.enable to true
LogConfig.validate(disklessAlreadyEnabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false)
// 2.5 Should NOT be possible to set diskless.enable to false
// Should NOT be possible to set diskless.enable to false
assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(disklessAlreadyEnabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false)
)

// 5. Given diskless.enable=false:
// 5.2 should NOT be possible to set diskless.enable to true
// Given diskless.enable=false:
// Should NOT be possible to set diskless.enable to true
assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(disklessAlreadyDisabled, setDisklessTrue, kafkaConfig.extractLogConfigMap, false)
)
// 5.5 Should be possible to set diskless.enable to false
// Should be possible to set diskless.enable to false
LogConfig.validate(disklessAlreadyDisabled, setDisklessFalse, kafkaConfig.extractLogConfigMap, false)

// Given existing topic without diskless.enable set (e.g., created before diskless was introduced):
// Should NOT be possible to set diskless.enable to true
val topicWithoutDisklessConfig = util.Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000")
assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(topicWithoutDisklessConfig, setDisklessTrue, kafkaConfig.extractLogConfigMap, false)
)
}

@Test
def testDisklessMigrationEnabled(): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
val isRemoteStorageSystemEnabled = true
// Given that migrating from classic to diskless is enabled
val isDisklessAllowFromClassicEnabled = true

// 1. Should be possible to switch from diskless.enable=false to diskless.enable=true for a tiered topic
val tieredTopicWithDisklessDisabled = util.Map.of(
TopicConfig.DISKLESS_ENABLE_CONFIG, "false",
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"
)
val migrateToDiskless = new Properties()
migrateToDiskless.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
migrateToDiskless.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
LogConfig.validate(tieredTopicWithDisklessDisabled, migrateToDiskless, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled)

// 1.1 Should be possible to switch from no diskless config to diskless.enable=true for a tiered topic
val tieredTopicWithoutDisklessConfig = util.Map.of(
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"
)
LogConfig.validate(tieredTopicWithoutDisklessConfig, migrateToDiskless, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled)

// 2. Should still NOT be possible to switch from diskless.enable=true to diskless.enable=false (even with migration enabled)
val disklessTopic = util.Map.of(
TopicConfig.DISKLESS_ENABLE_CONFIG, "true"
)
val setDisklessFalse = new Properties()
setDisklessFalse.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")
assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(disklessTopic, setDisklessFalse, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled)
)

// 3. After migration (diskless=true with remote.storage=true), should still be able to alter other configs
val migratedTopic = util.Map.of(
TopicConfig.DISKLESS_ENABLE_CONFIG, "true",
TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"
)
val keepMigratedState = new Properties()
keepMigratedState.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
keepMigratedState.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
LogConfig.validate(migratedTopic, keepMigratedState, kafkaConfig.extractLogConfigMap, isRemoteStorageSystemEnabled, isDisklessAllowFromClassicEnabled)
}
Comment thread
giuseppelillo marked this conversation as resolved.

@Test
def testDisklessMigrationRequiresBothMigrationAndRemoteStorage(): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)

val disklessAlreadyDisabled = util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "false")

val setDisklessTrue = new Properties()
setDisklessTrue.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")

// Case 1: Migration enabled but remote storage NOT enabled
assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(
disklessAlreadyDisabled,
setDisklessTrue,
kafkaConfig.extractLogConfigMap,
false, // isRemoteLogStorageSystemEnabled
true) // isDisklessAllowFromClassicEnabled
)

// Case 2: Remote storage enabled but migration NOT enabled
assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(
disklessAlreadyDisabled,
setDisklessTrue,
kafkaConfig.extractLogConfigMap,
true, // isRemoteLogStorageSystemEnabled
false) // isDisklessAllowFromClassicEnabled
)

// Case 3: Neither migration nor remote storage enabled
assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(
disklessAlreadyDisabled,
setDisklessTrue,
kafkaConfig.extractLogConfigMap,
false, // isRemoteLogStorageSystemEnabled
false) // isDisklessAllowFromClassicEnabled
)
}

Comment thread
giuseppelillo marked this conversation as resolved.
@Test
def testValidDisklessAndRemoteStorageEnable(): Unit = {
def testInvalidDisklessAndRemoteStorageEnable(): Unit = {
val kafkaProps = TestUtils.createDummyBrokerConfig()
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)

val logProps = new Properties
logProps.put(TopicConfig.DISKLESS_ENABLE_CONFIG, "true")
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")

// Add diskless
// Add diskless to existing TS topic (diskless not previously set) - treated same as diskless=false
val t1 = assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"), logProps, kafkaConfig.extractLogConfigMap, true))
assertEquals("Diskless and remote storage cannot be enabled simultaneously", t1.getMessage)
assertEquals("To migrate a classic topic to diskless, both diskless.enable and remote.storage.enable must be set to true, and the broker config diskless.allow.from.classic.enable must also be enabled.", t1.getMessage)

// Add remote storage
val t2 = assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(util.Map.of(TopicConfig.DISKLESS_ENABLE_CONFIG, "true"), logProps, kafkaConfig.extractLogConfigMap, true))
assertEquals("Diskless and remote storage cannot be enabled simultaneously", t2.getMessage)
assertEquals("It is invalid to enable remote storage on an existing diskless topic.", t2.getMessage)

// Add both
// Create a diskless topic with remote storage enabled is invalid
val t3 = assertThrows(
classOf[InvalidConfigurationException],
() => LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap, true))
assertEquals("Diskless and remote storage cannot be enabled simultaneously", t3.getMessage)
assertEquals("It is invalid to create a diskless topic with remote storage enabled.", t3.getMessage)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ public class ServerConfigs {
public static final String DISKLESS_STORAGE_SYSTEM_ENABLE_DOC = "Enable the diskless storage system. " +
"This enables diskless topics alongside classic topics.";

public static final String DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG = "diskless.allow.from.classic.enable";
public static final boolean DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DEFAULT = false;
public static final String DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DOC = "Allow migrating existing topics with remote.storage.enable=true from classic (diskless.enable=false) to diskless (diskless.enable=true). " +
"This should only be enabled in non-production environments for testing or migration purposes. " +
"When enabled, topics can have their diskless.enable config changed from false to true.";


/************* Authorizer Configuration ***********/
public static final String AUTHORIZER_CLASS_NAME_CONFIG = "authorizer.class.name";
Expand Down Expand Up @@ -178,6 +184,8 @@ public class ServerConfigs {
/** Diskless Configurations **/
.define(DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, BOOLEAN, DISKLESS_STORAGE_SYSTEM_ENABLE_DEFAULT, HIGH,
DISKLESS_STORAGE_SYSTEM_ENABLE_DOC)
.define(DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, BOOLEAN, DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DEFAULT, LOW,
DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_DOC)
/** Internal Configurations **/
// This indicates whether unreleased APIs should be advertised by this node.
.defineInternal(UNSTABLE_API_VERSIONS_ENABLE_CONFIG, BOOLEAN, false, HIGH)
Expand Down
Loading