Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers

import scala.collection.immutable
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, InklessMetadataView, KRaftMetadataCache, KRaftMetadataCachePublisher}
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.ApiMessageType.ListenerType
Expand Down Expand Up @@ -377,13 +377,10 @@ class ControllerServer(
new DelegationTokenManager(delegationTokenManagerConfigs, tokenCache)
))

// Inkless metadata view needed to filter out Diskless topics from offline/leadership metrics
val inklessMetadataView = new InklessMetadataView(metadataCache, () => config.extractLogConfigMap)
// Set up the metrics publisher.
metadataPublishers.add(new ControllerMetadataMetricsPublisher(
sharedServer.controllerServerMetrics,
sharedServer.metadataPublishingFaultHandler,
t => inklessMetadataView.isDisklessTopic(t)
sharedServer.metadataPublishingFaultHandler
))

// Set up the ACL publisher.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
"ControllerStats", "ElectionFromEligibleLeaderReplicasPerSec");
private static final MetricName IGNORED_STATIC_VOTERS = getMetricName(
"KafkaController", "IgnoredStaticVoters");
private static final MetricName DISKLESS_TOPIC_COUNT = getMetricName(
"KafkaController", "DisklessTopicCount");
private static final MetricName DISKLESS_PARTITION_COUNT = getMetricName(
"KafkaController", "DisklessPartitionCount");
private static final MetricName DISKLESS_OFFLINE_PARTITION_COUNT = getMetricName(
"KafkaController", "DisklessOfflinePartitionCount");
Comment thread
jeqo marked this conversation as resolved.

private final Optional<MetricsRegistry> registry;
private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
Expand All @@ -84,6 +90,9 @@ public final class ControllerMetadataMetrics implements AutoCloseable {
private Optional<Meter> uncleanLeaderElectionMeter = Optional.empty();
private Optional<Meter> electionFromEligibleLeaderReplicasMeter = Optional.empty();
private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false);
private final AtomicInteger disklessTopicCount = new AtomicInteger(0);
private final AtomicInteger disklessPartitionCount = new AtomicInteger(0);
private final AtomicInteger disklessOfflinePartitionCount = new AtomicInteger(0);

/**
* Create a new ControllerMetadataMetrics object.
Expand Down Expand Up @@ -151,6 +160,24 @@ public Integer value() {
return ignoredStaticVoters() ? 1 : 0;
}
}));
registry.ifPresent(r -> r.newGauge(DISKLESS_TOPIC_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return disklessTopicCount();
}
}));
registry.ifPresent(r -> r.newGauge(DISKLESS_PARTITION_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return disklessPartitionCount();
}
}));
registry.ifPresent(r -> r.newGauge(DISKLESS_OFFLINE_PARTITION_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return disklessOfflinePartitionCount();
}
}));
}

public void addBrokerRegistrationStateMetric(int brokerId) {
Expand Down Expand Up @@ -309,6 +336,42 @@ public boolean ignoredStaticVoters() {
return ignoredStaticVoters.get();
}

public void setDisklessTopicCount(int count) {
this.disklessTopicCount.set(count);
}

public void addToDisklessTopicCount(int delta) {
this.disklessTopicCount.addAndGet(delta);
}

public int disklessTopicCount() {
return this.disklessTopicCount.get();
}

public void setDisklessPartitionCount(int count) {
this.disklessPartitionCount.set(count);
}

public void addToDisklessPartitionCount(int delta) {
this.disklessPartitionCount.addAndGet(delta);
}

public int disklessPartitionCount() {
return this.disklessPartitionCount.get();
}

public void setDisklessOfflinePartitionCount(int count) {
this.disklessOfflinePartitionCount.set(count);
}

public void addToDisklessOfflinePartitionCount(int delta) {
this.disklessOfflinePartitionCount.addAndGet(delta);
}

public int disklessOfflinePartitionCount() {
return this.disklessOfflinePartitionCount.get();
}

@Override
public void close() {
registry.ifPresent(r -> List.of(
Expand All @@ -322,7 +385,10 @@ public void close() {
METADATA_ERROR_COUNT,
UNCLEAN_LEADER_ELECTIONS_PER_SEC,
ELECTION_FROM_ELIGIBLE_LEADER_REPLICAS_PER_SEC,
IGNORED_STATIC_VOTERS
IGNORED_STATIC_VOTERS,
DISKLESS_TOPIC_COUNT,
DISKLESS_PARTITION_COUNT,
DISKLESS_OFFLINE_PARTITION_COUNT
).forEach(r::removeMetric));
for (int brokerId : brokerRegistrationStates.keySet()) {
removeBrokerRegistrationStateMetric(brokerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.kafka.controller.metrics;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.image.ConfigurationsImage;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicDelta;
Expand All @@ -28,9 +31,9 @@
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.fault.FaultHandler;

import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Function;


/**
Expand All @@ -45,16 +48,13 @@ public class ControllerMetadataMetricsPublisher implements MetadataPublisher {
private final ControllerMetadataMetrics metrics;
private final FaultHandler faultHandler;
private MetadataImage prevImage = MetadataImage.EMPTY;
private Function<String, Boolean> isDisklessTopic;

public ControllerMetadataMetricsPublisher(
ControllerMetadataMetrics metrics,
FaultHandler faultHandler,
Function<String, Boolean> isDisklessTopic
FaultHandler faultHandler
) {
this.metrics = metrics;
this.faultHandler = faultHandler;
this.isDisklessTopic = isDisklessTopic;
}

@Override
Expand All @@ -71,7 +71,7 @@ public void onMetadataUpdate(
switch (manifest.type()) {
case LOG_DELTA:
try {
publishDelta(delta);
publishDelta(delta, newImage);
} catch (Throwable e) {
faultHandler.handleFault("Failed to publish controller metrics from log delta " +
" ending at offset " + manifest.provenance().lastContainedOffset(), e);
Expand All @@ -92,8 +92,8 @@ public void onMetadataUpdate(
}
}

private void publishDelta(MetadataDelta delta) {
ControllerMetricsChanges changes = new ControllerMetricsChanges(isDisklessTopic);
private void publishDelta(MetadataDelta delta, MetadataImage newImage) {
ControllerMetricsChanges changes = new ControllerMetricsChanges();
if (delta.clusterDelta() != null) {
Comment thread
jeqo marked this conversation as resolved.
for (Entry<Integer, Optional<BrokerRegistration>> entry :
delta.clusterDelta().changedBrokers().entrySet()) {
Expand All @@ -111,10 +111,29 @@ private void publishDelta(MetadataDelta delta) {
throw new RuntimeException("Unable to find deleted topic id " + topicId +
" in previous topics image.");
}
changes.handleDeletedTopic(prevTopic);
// For deleted topics, check isDiskless from prevImage since config is already removed from newImage
changes.handleDeletedTopic(prevTopic, isDisklessTopic(prevImage.configs(), prevTopic.name()));
}
for (Entry<Uuid, TopicDelta> entry : delta.topicsDelta().changedTopics().entrySet()) {
changes.handleTopicChange(prevImage.topics().getTopic(entry.getKey()), entry.getValue());
changes.handleTopicChange(
prevImage.topics().getTopic(entry.getKey()),
entry.getValue(),
isDisklessTopic(newImage.configs(), entry.getValue().name()));
}
}
// Handle diskless.enable config changes on existing topics (no TopicDelta required).
// Recategorizes partitions between classic and diskless metric buckets.
if (delta.configsDelta() != null) {
for (ConfigResource resource : delta.configsDelta().changes().keySet()) {
if (resource.type() != ConfigResource.Type.TOPIC) continue;
boolean wasDiskless = isDisklessTopic(prevImage.configs(), resource.name());
boolean isDiskless = isDisklessTopic(newImage.configs(), resource.name());
if (wasDiskless != isDiskless) {
TopicImage topic = newImage.topics().getTopic(resource.name());
if (topic != null) {
changes.handleDisklessConfigChange(topic, isDiskless);
}
}
}
}
changes.apply(metrics);
Expand Down Expand Up @@ -142,23 +161,44 @@ private void publishSnapshot(MetadataImage newImage) {
int totalPartitions = 0;
int offlinePartitions = 0;
int partitionsWithoutPreferredLeader = 0;
int disklessTopics = 0;
int disklessPartitions = 0;
int disklessOfflinePartitions = 0;
for (TopicImage topicImage : newImage.topics().topicsById().values()) {
boolean isDiskless = isDisklessTopic.apply(topicImage.name());
// Check diskless from newImage configs directly for consistency with delta path
final boolean isDiskless = isDisklessTopic(newImage.configs(), topicImage.name());
if (isDiskless) {
disklessTopics++;
}
for (PartitionRegistration partition : topicImage.partitions().values()) {
if (!isDiskless) {
totalPartitions++;
if (isDiskless) {
disklessPartitions++;
if (!partition.hasLeader()) {
disklessOfflinePartitions++;
}
} else {
if (!partition.hasLeader()) {
offlinePartitions++;
}
if (!partition.hasPreferredLeader()) {
partitionsWithoutPreferredLeader++;
}
}
totalPartitions++;
}
}
metrics.setGlobalPartitionCount(totalPartitions);
metrics.setOfflinePartitionCount(offlinePartitions);
metrics.setPreferredReplicaImbalanceCount(partitionsWithoutPreferredLeader);
metrics.setDisklessTopicCount(disklessTopics);
metrics.setDisklessPartitionCount(disklessPartitions);
metrics.setDisklessOfflinePartitionCount(disklessOfflinePartitions);
}

private static boolean isDisklessTopic(ConfigurationsImage configsImage, String topicName) {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
Map<String, String> configMap = configsImage.configMapForResource(resource);
return Boolean.parseBoolean(configMap.getOrDefault(TopicConfig.DISKLESS_ENABLE_CONFIG, "false"));
}

@Override
Expand Down
Loading
Loading