Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Node, TopicIdPartition, Uuid}
import org.apache.kafka.storage.internals.log.LogConfig

import java.util
import java.util.Properties
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Supplier
import java.util.stream.{Collectors, IntStream}
import java.{lang, util}
import scala.jdk.CollectionConverters._

class InklessMetadataView(val metadataCache: KRaftMetadataCache, val defaultConfig: Supplier[util.Map[String, Object]]) extends MetadataView {
Expand All @@ -54,7 +54,7 @@ class InklessMetadataView(val metadataCache: KRaftMetadataCache, val defaultConf
defaultConfig.get().asScala.filter(_._2 != null).asJava
}

override def getAliveBrokerNodes(listenerName: ListenerName): lang.Iterable[Node] = {
override def getAliveBrokerNodes(listenerName: ListenerName): util.List[Node] = {
metadataCache.getAliveBrokerNodes(listenerName)
}

Expand All @@ -70,6 +70,10 @@ class InklessMetadataView(val metadataCache: KRaftMetadataCache, val defaultConf
metadataCache.topicConfig(topicName).getProperty(TopicConfig.DISKLESS_ENABLE_CONFIG, "false").toBoolean
}

override def isRemoteStorageEnabled(topicName: String): Boolean = {
metadataCache.topicConfig(topicName).getProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false").toBoolean
}

override def getDisklessTopicPartitions: util.Set[TopicIdPartition] = {
metadataCache.getAllTopics().stream()
.filter(isDisklessTopic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,31 @@ class InklessMetadataViewTest {
}
}

private def createTopicProps(diskless: Option[String]): Properties = {
@Test
def testIsRemoteStorageEnabled(): Unit = {
val metadataCache = mock(classOf[KRaftMetadataCache])
val metadataView = new InklessMetadataView(metadataCache, null)

// Each tuple contains: (description, properties object, expected result)
val testCases = Seq(
("remote.log.storage.enable=true", createTopicProps(remoteStorageEnable = Some("true")), true),
("case-insensitive", createTopicProps(remoteStorageEnable = Some("TRUE")), true),
("remote.log.storage.enable=false", createTopicProps(remoteStorageEnable = Some("false")), false),
("empty properties", new Properties(), false),
("unrelated properties", {val p = new Properties(); p.put("foo", "bar"); p}, false),
)

testCases.foreach { case (description, props, expected) =>
val topicName = description.replaceAll(" ", "-")
when(metadataCache.topicConfig(topicName)).thenReturn(props)
assertEquals(expected, metadataView.isRemoteStorageEnabled(topicName), s"Failed on case: '$description'")
}
}

private def createTopicProps(diskless: Option[String] = None, remoteStorageEnable: Option[String] = None): Properties = {
val props = new Properties()
diskless.foreach(v => props.put(TopicConfig.DISKLESS_ENABLE_CONFIG, v))
remoteStorageEnable.foreach(v => props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, v))
props
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.storage.internals.log.LogConfig;

import java.util.List;
import java.util.Set;

public interface MetadataView {
Iterable<Node> getAliveBrokerNodes(ListenerName listenerName);
List<Node> getAliveBrokerNodes(ListenerName listenerName);

Integer getBrokerCount();

Uuid getTopicId(String topicName);

boolean isDisklessTopic(String topicName);

boolean isRemoteStorageEnabled(String topicName);

LogConfig getTopicConfig(String topicName);

Set<TopicIdPartition> getDisklessTopicPartitions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,35 @@
import java.util.concurrent.TimeUnit;

public class ClientAzAwarenessMetrics implements Closeable {
// Client AZ matching metrics
private static final String CLIENT_AZ_HIT_RATE = "client-az-hit-rate";
private static final String CLIENT_AZ_MISS_RATE = "client-az-miss-rate";
private static final String CLIENT_AZ_UNAWARE_RATE = "client-az-unaware-rate";
// Transformer targets metrics
private static final String FALLBACK_TOTAL = "fallback-total";
private static final String OFFLINE_REPLICAS_ROUTED_AROUND = "offline-replicas-routed-around";
private static final String CROSS_AZ_ROUTING_TOTAL = "cross-az-routing-total";
// Tags
public static final String CLIENT_AZ_TAG = "client-az";

private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(
ClientAzAwarenessMetrics.class.getPackageName(), ClientAzAwarenessMetrics.class.getSimpleName());
final Meter clientAzHitRate;
final Meter clientAzMissRate;
final Map<String, Meter> clientAzHitRatesPerAz;
final Meter clientAzUnawareRate;
final Meter fallbackTotal;
final Meter offlineReplicasRoutedAround;
final Meter crossAzRoutingTotal;

public ClientAzAwarenessMetrics() {
clientAzUnawareRate = metricsGroup.newMeter(CLIENT_AZ_UNAWARE_RATE, "requests", TimeUnit.SECONDS, Map.of());
clientAzMissRate = metricsGroup.newMeter(CLIENT_AZ_MISS_RATE, "requests", TimeUnit.SECONDS, Map.of());
clientAzHitRate = metricsGroup.newMeter(CLIENT_AZ_HIT_RATE, "requests", TimeUnit.SECONDS, Map.of());
clientAzHitRatesPerAz = new ConcurrentHashMap<>(3);
fallbackTotal = metricsGroup.newMeter(FALLBACK_TOTAL, "requests", TimeUnit.SECONDS, Map.of());
offlineReplicasRoutedAround = metricsGroup.newMeter(OFFLINE_REPLICAS_ROUTED_AROUND, "requests", TimeUnit.SECONDS, Map.of());
crossAzRoutingTotal = metricsGroup.newMeter(CROSS_AZ_ROUTING_TOTAL, "requests", TimeUnit.SECONDS, Map.of());
}

public void recordClientAz(String clientAZ, boolean foundBrokersInClientAZ) {
Expand All @@ -59,15 +72,41 @@ public void recordClientAz(String clientAZ, boolean foundBrokersInClientAZ) {
CLIENT_AZ_HIT_RATE,
"requests",
TimeUnit.SECONDS,
Map.of("client-az", clientAZ)
Map.of(CLIENT_AZ_TAG, clientAZ)
)).mark();
}

/**
* Record when routing falls back to a non-replica broker.
* This only happens for diskless-only topics (remote storage disabled).
*/
public void recordFallback() {
fallbackTotal.mark();
}

/**
* Record when routing around offline replicas to find an available leader.
* Indicates some replicas were offline but an alternative was found.
*/
public void recordOfflineReplicasRoutedAround() {
offlineReplicasRoutedAround.mark();
}

/**
* Record when routing to a broker in a different AZ than the client.
*/
public void recordCrossAzRouting() {
crossAzRoutingTotal.mark();
}

@Override
public void close() {
metricsGroup.removeMetric(CLIENT_AZ_HIT_RATE);
metricsGroup.removeMetric(CLIENT_AZ_MISS_RATE);
metricsGroup.removeMetric(CLIENT_AZ_UNAWARE_RATE);
clientAzHitRatesPerAz.keySet().forEach(az -> metricsGroup.removeMetric(CLIENT_AZ_HIT_RATE, Map.of("az", az)));
metricsGroup.removeMetric(FALLBACK_TOTAL);
metricsGroup.removeMetric(OFFLINE_REPLICAS_ROUTED_AROUND);
metricsGroup.removeMetric(CROSS_AZ_ROUTING_TOTAL);
clientAzHitRatesPerAz.keySet().forEach(az -> metricsGroup.removeMetric(CLIENT_AZ_HIT_RATE, Map.of(CLIENT_AZ_TAG, az)));
}
}
Loading
Loading