Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions core/src/test/java/kafka/server/InklessConfigsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT;
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_DELETE;
import static org.apache.kafka.common.config.TopicConfig.DISKLESS_ENABLE_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -336,6 +337,58 @@ void compactedRegexExcludedTopicIsExcludedFromForcePolicy() throws Exception {
cluster.close();
}
}

@Test
void changingCleanupPolicyFromCompactToDeleteRequiresRemoteStorageEnable() throws Exception {
final KafkaClusterTestKit cluster = initWithClassicRemoteStorageForceEnabled();
final Map<String, Object> clientConfigs = new HashMap<>();
clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());

try (final Admin admin = AdminClient.create(clientConfigs)) {
final String compactedTopic = "compacted-to-delete-rejected";
createTopic(admin, compactedTopic, Map.of(
CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT,
REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"
));

final ExecutionException exception = assertThrows(
ExecutionException.class,
() -> alterTopicConfig(admin, compactedTopic, Map.of(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE))
);
assertEquals(
"It is invalid to change cleanup.policy from compact to delete without enabling remote.storage.enable " +
"when classic.remote.storage.force.enable is enabled.",
exception.getCause().getMessage()
);
} finally {
cluster.close();
}
}

@Test
void changingCleanupPolicyFromCompactToDeleteWithRemoteStorageEnableIsAllowed() throws Exception {
final KafkaClusterTestKit cluster = initWithClassicRemoteStorageForceEnabled();
final Map<String, Object> clientConfigs = new HashMap<>();
clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());

try (final Admin admin = AdminClient.create(clientConfigs)) {
final String compactedTopic = "compacted-to-delete-allowed";
createTopic(admin, compactedTopic, Map.of(
CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT,
REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"
));
alterTopicConfig(admin, compactedTopic, Map.of(
CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE,
REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"
));

final Map<String, String> alteredTopicConfig = getTopicConfig(admin, compactedTopic);
assertEquals(CLEANUP_POLICY_DELETE, alteredTopicConfig.get(CLEANUP_POLICY_CONFIG));
assertEquals("true", alteredTopicConfig.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG));
} finally {
cluster.close();
}
}
}

public void createTopic(Admin admin, String topic, Map<String, String> configs) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,31 @@ void maybeForceRemoteStorageEnable(
}
}

void validateCompactedToDeleteCleanupPolicyTransition(
final String topicName,
final Map<String, String> targetConfigs,
final Map<String, String> existingConfigs
) {
if (!enabled
|| isInternalTopic(topicName)
|| topicExcludedByRegex(topicName)) {
return;
}
if (!cleanupPolicyContainsCompact(existingConfigs)) {
return;
}
if (!cleanupPolicyIsDeleteOnly(targetConfigs)) {
return;
}
final boolean disklessEnabled = Boolean.parseBoolean(targetConfigs.get(TopicConfig.DISKLESS_ENABLE_CONFIG));
final boolean remoteStorageEnabled = Boolean.parseBoolean(targetConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG));
if (!disklessEnabled && !remoteStorageEnabled) {
throw new org.apache.kafka.common.errors.InvalidConfigurationException(
"It is invalid to change cleanup.policy from compact to delete without enabling remote.storage.enable " +
"when classic.remote.storage.force.enable is enabled.");
}
}

private boolean shouldForceRemoteStorageEnable(
final String topicName,
final boolean disklessEnabled,
Expand Down Expand Up @@ -100,6 +125,25 @@ private boolean cleanupPolicyContainsCompact(final Map<String, String> topicConf
return false;
}

private boolean cleanupPolicyIsDeleteOnly(final Map<String, String> topicConfigs) {
final String cleanupPolicy = topicConfigs.get(TopicConfig.CLEANUP_POLICY_CONFIG);
if (cleanupPolicy == null) {
return false;
}
int nonEmptyPolicies = 0;
for (String policy : cleanupPolicy.split(",")) {
final String trimmedPolicy = policy.trim();
if (trimmedPolicy.isEmpty()) {
continue;
}
nonEmptyPolicies++;
if (!TopicConfig.CLEANUP_POLICY_DELETE.equals(trimmedPolicy)) {
return false;
}
}
return nonEmptyPolicies == 1;
}

private boolean topicExcludedByRegex(final String topicName) {
for (Pattern pattern : excludeTopicPatterns) {
if (pattern.matcher(topicName).matches()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class ConfigurationControlManager {
private final Map<String, Object> staticConfig;
private final ConfigResource currentController;
private final FeatureControlManager featureControl;
private final ClassicTopicRemoteStorageForcePolicy classicTopicRemoteStorageForcePolicy;

static class Builder {
private LogContext logContext = null;
Expand All @@ -88,6 +89,8 @@ static class Builder {
private Map<String, Object> staticConfig = Map.of();
private int nodeId = 0;
private FeatureControlManager featureControl = null;
private boolean classicRemoteStorageForceEnabled = false;
private List<String> classicRemoteStorageForceExcludeTopicRegexes = List.of();

Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
Expand Down Expand Up @@ -134,6 +137,16 @@ Builder setFeatureControl(FeatureControlManager featureControl) {
return this;
}

Builder setClassicRemoteStorageForceEnabled(boolean classicRemoteStorageForceEnabled) {
this.classicRemoteStorageForceEnabled = classicRemoteStorageForceEnabled;
return this;
}

Builder setClassicRemoteStorageForceExcludeTopicRegexes(List<String> classicRemoteStorageForceExcludeTopicRegexes) {
this.classicRemoteStorageForceExcludeTopicRegexes = classicRemoteStorageForceExcludeTopicRegexes;
return this;
}

ConfigurationControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
Expand All @@ -152,7 +165,9 @@ ConfigurationControlManager build() {
validator,
staticConfig,
nodeId,
featureControl);
featureControl,
classicRemoteStorageForceEnabled,
classicRemoteStorageForceExcludeTopicRegexes);
}
}

Expand All @@ -164,7 +179,9 @@ private ConfigurationControlManager(LogContext logContext,
ConfigurationValidator validator,
Map<String, Object> staticConfig,
int nodeId,
FeatureControlManager featureControl
FeatureControlManager featureControl,
boolean classicRemoteStorageForceEnabled,
List<String> classicRemoteStorageForceExcludeTopicRegexes
) {
this.log = logContext.logger(ConfigurationControlManager.class);
this.snapshotRegistry = snapshotRegistry;
Expand All @@ -177,6 +194,10 @@ private ConfigurationControlManager(LogContext logContext,
this.staticConfig = Map.copyOf(staticConfig);
this.currentController = new ConfigResource(Type.BROKER, Integer.toString(nodeId));
this.featureControl = featureControl;
this.classicTopicRemoteStorageForcePolicy = new ClassicTopicRemoteStorageForcePolicy(
classicRemoteStorageForceEnabled,
classicRemoteStorageForceExcludeTopicRegexes
);
}

SnapshotRegistry snapshotRegistry() {
Expand Down Expand Up @@ -373,6 +394,13 @@ private ApiError validateAlterConfig(
// in the list passed to the policy in order to maintain backwards compatibility
}
try {
if (configResource.type() == Type.TOPIC) {
classicTopicRemoteStorageForcePolicy.validateCompactedToDeleteCleanupPolicyTransition(
configResource.name(),
allConfigs,
existingConfigsMap
);
}
validator.validate(configResource, allConfigs, existingConfigsMap);
if (!newlyCreatedResource) {
existenceChecker.accept(configResource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1569,6 +1569,8 @@ private QuorumController(
setValidator(configurationValidator).
setStaticConfig(staticConfig).
setNodeId(nodeId).
setClassicRemoteStorageForceEnabled(classicRemoteStorageForceEnabled).
setClassicRemoteStorageForceExcludeTopicRegexes(classicRemoteStorageForceExcludeTopicRegexes).
setFeatureControl(featureControl).
build();
this.producerIdControlManager = new ProducerIdControlManager.Builder().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,34 @@ public void testIncrementalAlterConfig() {
invalidConfigValueResult.response().message());
}

@Test
public void testCleanupPolicyCompactToDeleteRejectedWhenClassicRemoteStorageForceEnabled() {
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
setFeatureControl(createFeatureControlManager()).
setKafkaConfigSchema(SCHEMA).
setClassicRemoteStorageForceEnabled(true).
build();

RecordTestUtils.replayAll(manager, List.of(
new ApiMessageAndVersion(new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic")
.setName(TopicConfig.CLEANUP_POLICY_CONFIG).setValue(TopicConfig.CLEANUP_POLICY_COMPACT), (short) 0),
new ApiMessageAndVersion(new ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic")
.setName(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG).setValue("false"), (short) 0)
));

ControllerResult<ApiError> result = manager.incrementalAlterConfig(
MYTOPIC,
Map.of(TopicConfig.CLEANUP_POLICY_CONFIG, entry(SET, TopicConfig.CLEANUP_POLICY_DELETE)),
false
);
assertEquals(Errors.INVALID_CONFIG, result.response().error());
assertEquals(
"It is invalid to change cleanup.policy from compact to delete without enabling remote.storage.enable " +
"when classic.remote.storage.force.enable is enabled.",
result.response().message()
);
}

@Test
public void testIncrementalAlterMultipleConfigValues() {
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
Expand Down