Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ class ControllerServer(
setDefaultNumPartitions(config.numPartitions.intValue()).
setDefaultDisklessEnable(config.logDisklessEnable).
setDisklessStorageSystemEnabled(config.disklessStorageSystemEnabled).
setClassicRemoteStorageForceEnabled(config.classicRemoteStorageForceEnabled).
setClassicRemoteStorageForceExcludeTopicRegexes(config.classicRemoteStorageForceExcludeTopicRegexes).
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
TimeUnit.MILLISECONDS)).
setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs).
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
/** Diskless Configuration */
val disklessStorageSystemEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG)
val disklessAllowFromClassicEnabled: Boolean = getBoolean(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG)
val classicRemoteStorageForceEnabled: Boolean = getBoolean(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG)
val classicRemoteStorageForceExcludeTopicRegexes: java.util.List[String] =
getList(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_EXCLUDE_TOPIC_REGEXES_CONFIG)

def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.addReconfigurable(reconfigurable)
Expand Down
159 changes: 159 additions & 0 deletions core/src/test/java/kafka/server/InklessConfigsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
Expand All @@ -32,6 +33,7 @@
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -58,8 +60,12 @@
import io.aiven.inkless.test_utils.PostgreSQLTestContainer;
import io.aiven.inkless.test_utils.S3TestContainer;

import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT;
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.DISKLESS_ENABLE_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

@Testcontainers
Expand All @@ -74,6 +80,14 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di
}

private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean disklessStorageEnableConfig, boolean isDisklessAllowFromClassicEnabled) throws Exception {
return init(defaultDisklessEnableConfig, disklessStorageEnableConfig, isDisklessAllowFromClassicEnabled, false, List.of());
}

private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig,
boolean disklessStorageEnableConfig,
boolean isDisklessAllowFromClassicEnabled,
boolean classicRemoteStorageForceEnabled,
List<String> classicRemoteStorageForceExcludeTopicRegexes) throws Exception {
final TestKitNodes nodes = new TestKitNodes.Builder()
.setCombined(true)
.setNumBrokerNodes(1)
Expand All @@ -84,6 +98,9 @@ private KafkaClusterTestKit init(boolean defaultDisklessEnableConfig, boolean di
.setConfigProp(ServerLogConfigs.DISKLESS_ENABLE_CONFIG, String.valueOf(defaultDisklessEnableConfig))
.setConfigProp(ServerConfigs.DISKLESS_STORAGE_SYSTEM_ENABLE_CONFIG, String.valueOf(disklessStorageEnableConfig))
.setConfigProp(ServerConfigs.DISKLESS_ALLOW_FROM_CLASSIC_ENABLE_CONFIG, String.valueOf(isDisklessAllowFromClassicEnabled))
.setConfigProp(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_ENABLE_CONFIG, String.valueOf(classicRemoteStorageForceEnabled))
.setConfigProp(ServerConfigs.CLASSIC_REMOTE_STORAGE_FORCE_EXCLUDE_TOPIC_REGEXES_CONFIG,
String.join(",", classicRemoteStorageForceExcludeTopicRegexes))
.setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
.setConfigProp(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
.setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
Expand Down Expand Up @@ -194,13 +211,155 @@ public void classicTopicWithDisklessDefaultTrueConfigs() throws Exception {
cluster.close();
}

@Nested
final class ClassicRemoteStorageForcePolicy {
private static final List<String> EXCLUDED_TOPIC_REGEXES = List.of("_schemas", "mm2-(.*)");

private KafkaClusterTestKit initWithClassicRemoteStorageForceEnabled() throws Exception {
return init(false, true, false, true, EXCLUDED_TOPIC_REGEXES);
}

@Test
void remoteStorageEnableIsAlwaysTrueForClassicTopics() throws Exception {
final KafkaClusterTestKit cluster = initWithClassicRemoteStorageForceEnabled();
final Map<String, Object> clientConfigs = new HashMap<>();
clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());

try (final Admin admin = AdminClient.create(clientConfigs)) {
final String noRemoteConfigTopic = "classic-no-remote-config";
assertEquals("true", createTopicAndGetRemoteStorageFromCreateResponse(admin, noRemoteConfigTopic, Map.of()));
assertEquals("true", getTopicConfig(admin, noRemoteConfigTopic).get(REMOTE_LOG_STORAGE_ENABLE_CONFIG));

final String remoteFalseTopic = "classic-remote-false";
assertEquals("true", createTopicAndGetRemoteStorageFromCreateResponse(
admin, remoteFalseTopic, Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")));
assertEquals("true", getTopicConfig(admin, remoteFalseTopic).get(REMOTE_LOG_STORAGE_ENABLE_CONFIG));

final String remoteTrueTopic = "classic-remote-true";
assertEquals("true", createTopicAndGetRemoteStorageFromCreateResponse(
admin, remoteTrueTopic, Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")));
assertEquals("true", getTopicConfig(admin, remoteTrueTopic).get(REMOTE_LOG_STORAGE_ENABLE_CONFIG));
} finally {
cluster.close();
}
}

@Test
void compactedTopicsAreExcludedFromForcePolicy() throws Exception {
final KafkaClusterTestKit cluster = initWithClassicRemoteStorageForceEnabled();
final Map<String, Object> clientConfigs = new HashMap<>();
clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());

try (final Admin admin = AdminClient.create(clientConfigs)) {
// Compacted topic gets remote.storage.enable=false when not specified
final String compactedNoRemoteTopic = "compacted-no-remote";
assertEquals("false", createTopicAndGetRemoteStorageFromCreateResponse(
admin, compactedNoRemoteTopic, Map.of(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT)));
assertEquals("false", getTopicConfig(admin, compactedNoRemoteTopic).get(REMOTE_LOG_STORAGE_ENABLE_CONFIG));

// Setting remote.storage.enable=false is allowed for a compacted topic
final String compactedRemoteFalseTopic = "compacted-remote-false";
assertEquals("false", createTopicAndGetRemoteStorageFromCreateResponse(
admin,
compactedRemoteFalseTopic,
Map.of(
CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT,
REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"
)));
assertEquals("false", getTopicConfig(admin, compactedRemoteFalseTopic).get(REMOTE_LOG_STORAGE_ENABLE_CONFIG));

// Compacted tiered topics are not supported by Kafka
final String compactedRemoteTrueTopic = "compacted-remote-true";
final ExecutionException exception = assertThrows(
ExecutionException.class,
() -> createTopicAndGetRemoteStorageFromCreateResponse(
admin,
compactedRemoteTrueTopic,
Map.of(
CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT,
REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"
)
)
);
assertEquals(
"Remote log storage only supports topics with cleanup.policy=delete or cleanup.policy being an empty list.",
exception.getCause().getMessage()
);
} finally {
cluster.close();
}
}

@Test
void regexExcludedTopicsAreExcludedFromForcePolicy() throws Exception {
final KafkaClusterTestKit cluster = initWithClassicRemoteStorageForceEnabled();
final Map<String, Object> clientConfigs = new HashMap<>();
clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());

try (final Admin admin = AdminClient.create(clientConfigs)) {
// Excluded topic gets remote.storage.enable=false when not setting it
final String schemasTopic = "_schemas";
assertEquals("false", createTopicAndGetRemoteStorageFromCreateResponse(admin, schemasTopic, Map.of()));
assertEquals("false", getTopicConfig(admin, schemasTopic).get(REMOTE_LOG_STORAGE_ENABLE_CONFIG));

// Setting remote.storage.enable=false is allowed for an excluded topic
final String mm2TopicRemoteFalse = "mm2-heartbeats";
assertEquals("false", createTopicAndGetRemoteStorageFromCreateResponse(
admin, mm2TopicRemoteFalse, Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")));
assertEquals("false", getTopicConfig(admin, mm2TopicRemoteFalse).get(REMOTE_LOG_STORAGE_ENABLE_CONFIG));

// Excluded topic can still have remote.storage.enable=true if explicitly set
final String mm2TopicRemoteTrue = "mm2-checkpoints";
assertEquals("true", createTopicAndGetRemoteStorageFromCreateResponse(
admin, mm2TopicRemoteTrue, Map.of(REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")));
assertEquals("true", getTopicConfig(admin, mm2TopicRemoteTrue).get(REMOTE_LOG_STORAGE_ENABLE_CONFIG));
} finally {
cluster.close();
}
}

@Test
void compactedRegexExcludedTopicIsExcludedFromForcePolicy() throws Exception {
final KafkaClusterTestKit cluster = initWithClassicRemoteStorageForceEnabled();
final Map<String, Object> clientConfigs = new HashMap<>();
clientConfigs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());

try (final Admin admin = AdminClient.create(clientConfigs)) {
final String compactedSchemasTopic = "_schemas";
assertEquals("false", createTopicAndGetRemoteStorageFromCreateResponse(
admin,
compactedSchemasTopic,
Map.of(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_COMPACT)
));
assertEquals("false", getTopicConfig(admin, compactedSchemasTopic).get(REMOTE_LOG_STORAGE_ENABLE_CONFIG));
} finally {
cluster.close();
}
}
}

public void createTopic(Admin admin, String topic, Map<String, String> configs) throws Exception {
admin.createTopics(Collections.singletonList(
new NewTopic(topic, 1, (short) 1)
.configs(configs)
)).all().get(10, TimeUnit.SECONDS);
}

private String createTopicAndGetRemoteStorageFromCreateResponse(
final Admin admin,
final String topic,
final Map<String, String> configs
) throws Exception {
final CreateTopicsResult createResult = admin.createTopics(Collections.singletonList(
new NewTopic(topic, 1, (short) 1).configs(configs)
));
createResult.all().get(10, TimeUnit.SECONDS);
final ConfigEntry remoteStorageConfig = createResult.config(topic).get(10, TimeUnit.SECONDS)
.get(REMOTE_LOG_STORAGE_ENABLE_CONFIG);
assertNotNull(remoteStorageConfig);
return remoteStorageConfig.value();
}

private Map<String, String> getTopicConfig(Admin admin, String topic)
throws ExecutionException, InterruptedException, TimeoutException {
int maxRetries = 3;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.controller;

import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.internals.Topic;

import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Pattern;

import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_NAME;

final class ClassicTopicRemoteStorageForcePolicy {
private final boolean enabled;
private final List<Pattern> excludeTopicPatterns;
private static final Set<String>
ADDITIONAL_INTERNAL_TOPICS = Set.of(CLUSTER_METADATA_TOPIC_NAME, "__remote_log_metadata");


ClassicTopicRemoteStorageForcePolicy(final boolean enabled, final List<String> excludeTopicRegexes) {
this.enabled = enabled;
this.excludeTopicPatterns = excludeTopicRegexes.stream().map(Pattern::compile).toList();
}

void maybeForceRemoteStorageEnable(
final String topicName,
final boolean disklessEnabled,
final Map<String, String> requestConfigs,
final Map<String, Entry<OpType, String>> targetConfigOps
) {
if (!enabled) {
return;
}
if (shouldForceRemoteStorageEnable(topicName, disklessEnabled, requestConfigs)) {
targetConfigOps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, new SimpleImmutableEntry<>(SET, "true"));
}
}

void maybeForceRemoteStorageEnable(
final String topicName,
final boolean disklessEnabled,
final Map<String, String> targetConfigs
) {
if (!enabled) {
return;
}
if (shouldForceRemoteStorageEnable(topicName, disklessEnabled, targetConfigs)) {
targetConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
}
}

private boolean shouldForceRemoteStorageEnable(
final String topicName,
final boolean disklessEnabled,
final Map<String, String> topicConfigs
) {
return !(disklessEnabled
|| isInternalTopic(topicName)
|| topicExcludedByRegex(topicName)
|| cleanupPolicyContainsCompact(topicConfigs));
}

private boolean isInternalTopic(final String topicName) {
if (Topic.isInternal(topicName)) {
return true;
} else return ADDITIONAL_INTERNAL_TOPICS.contains(topicName);
}

private boolean cleanupPolicyContainsCompact(final Map<String, String> topicConfigs) {
final String cleanupPolicy = topicConfigs.get(TopicConfig.CLEANUP_POLICY_CONFIG);
if (cleanupPolicy == null || cleanupPolicy.isEmpty()) {
return false;
}
for (String policy : cleanupPolicy.split(",")) {
if (TopicConfig.CLEANUP_POLICY_COMPACT.equals(policy.trim())) {
return true;
}
}
return false;
}

private boolean topicExcludedByRegex(final String topicName) {
for (Pattern pattern : excludeTopicPatterns) {
if (pattern.matcher(topicName).matches()) {
return true;
}
}
return false;
}
}
Loading