Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,12 @@ private BrokerTopicMetrics(Optional<String> name, boolean remoteStorageEnabled)
final MeterWrapper classicBytesInRate;
final MeterWrapper classicBytesOutRate;
if (isAllTopicsStats) {
// All-topics (broker-level) meters with topicType tag are eagerly initialized
// by MeterWrapper constructor since they only have topicType tag (global metrics).
classicBytesInRate = new MeterWrapper(BYTES_IN_PER_SEC, "bytes", classicTags);
classicBytesOutRate = new MeterWrapper(BYTES_OUT_PER_SEC, "bytes", classicTags);
disklessBytesInRate = new MeterWrapper(BYTES_IN_PER_SEC, "bytes", disklessTags);
disklessBytesOutRate = new MeterWrapper(BYTES_OUT_PER_SEC, "bytes", disklessTags);

// Keep broker-level behavior: eagerly initialize the classic meters.
classicBytesInRate.meter();
classicBytesOutRate.meter();
} else {
classicBytesInRate = new MeterWrapper(BYTES_IN_PER_SEC, "bytes", tags);
classicBytesOutRate = new MeterWrapper(BYTES_OUT_PER_SEC, "bytes", tags);
Expand Down Expand Up @@ -486,8 +484,14 @@ public MeterWrapper(String metricType, String eventType, Map<String, String> met
this.metricType = metricType;
this.eventType = eventType;
this.metricTags = new HashMap<>(metricTags);
if (this.metricTags.isEmpty()) {
meter(); // greedily initialize the general topic metrics
// Eagerly initialize global metrics so they are always registered and visible in monitoring
// systems from broker startup, even before any traffic is recorded.
// Global metrics are those with:
// - No tags (general broker-level metrics)
// - Only topicType tag (broker-level metrics split by topic type, e.g., classic vs diskless)
boolean onlyTopicTypeTag = this.metricTags.size() == 1 && this.metricTags.containsKey("topicType");
if (this.metricTags.isEmpty() || onlyTopicTypeTag) {
meter();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.storage.log.metrics;

import org.apache.kafka.server.metrics.KafkaYammerMetrics;

import com.yammer.metrics.core.Meter;

import org.junit.jupiter.api.AfterEach;
Expand All @@ -39,6 +41,7 @@
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
Expand Down Expand Up @@ -176,6 +179,128 @@ public void testDefaultMethodsUseClassicTopicType() {
"Default bytesOutRate() should return same meter as bytesOutRate(false) for all-topics stats");
}

/**
* Verifies that all-topics (broker-level) meters are eagerly initialized during construction.
* This includes:
* - Meters with no tags (general broker metrics)
* - Meters with only topicType tag (classic and diskless variants for bytesIn/bytesOut)
*
* Eager initialization is required so these global metrics are always registered and visible
* in monitoring systems from broker startup, even before any traffic is recorded.
*
* This test ensures that the eager initialization logic cannot be accidentally removed
* without breaking tests (regression prevention for issue introduced in aa045fbd).
*/
@Test
public void testAllTopicsMetersAreEagerlyInitialized() {
// Create a fresh instance to verify eager initialization happens during construction
BrokerTopicMetrics freshAllTopicsStats = new BrokerTopicMetrics(false);

try {
// For all-topics stats, both classic and diskless meters should be eagerly initialized.
// We verify this by checking that calling meter() multiple times returns the SAME instance,
// which proves the meter was already created (not lazily created on first access).

// Classic meters (topicType=classic) should be eagerly initialized
Meter classicBytesIn1 = freshAllTopicsStats.bytesInRate(false);
Meter classicBytesIn2 = freshAllTopicsStats.bytesInRate(false);
assertSame(classicBytesIn1, classicBytesIn2,
"Classic bytesInRate should return same instance (eagerly initialized)");

Meter classicBytesOut1 = freshAllTopicsStats.bytesOutRate(false);
Meter classicBytesOut2 = freshAllTopicsStats.bytesOutRate(false);
assertSame(classicBytesOut1, classicBytesOut2,
"Classic bytesOutRate should return same instance (eagerly initialized)");

// Diskless meters (topicType=diskless) should also be eagerly initialized
// This is the regression that was introduced in aa045fbd - the diskless meters
// with only topicType tag were not being eagerly initialized
Meter disklessBytesIn1 = freshAllTopicsStats.bytesInRate(true);
Meter disklessBytesIn2 = freshAllTopicsStats.bytesInRate(true);
assertSame(disklessBytesIn1, disklessBytesIn2,
"Diskless bytesInRate should return same instance (eagerly initialized)");

Meter disklessBytesOut1 = freshAllTopicsStats.bytesOutRate(true);
Meter disklessBytesOut2 = freshAllTopicsStats.bytesOutRate(true);
assertSame(disklessBytesOut1, disklessBytesOut2,
"Diskless bytesOutRate should return same instance (eagerly initialized)");

// Also verify that general metrics (no tags) are eagerly initialized
Meter messagesIn1 = freshAllTopicsStats.messagesInRate();
Meter messagesIn2 = freshAllTopicsStats.messagesInRate();
assertSame(messagesIn1, messagesIn2,
"messagesInRate should return same instance (eagerly initialized for all-topics stats)");
} finally {
freshAllTopicsStats.close();
}
}

/**
* Verifies that all-topics (broker-level) meters with topicType tag are registered in the
* metrics registry immediately upon construction, without requiring any explicit access.
*
* This is a stronger test than testAllTopicsMetersAreEagerlyInitialized - it verifies that
* the metrics are actually present in the Yammer registry right after construction, which
* ensures they will be visible in monitoring systems from broker startup.
*
* This test specifically guards against the regression in aa045fbd where the eager
* initialization logic for metrics with only topicType tag was accidentally removed.
*/
@Test
public void testAllTopicsMetersWithTopicTypeTagAreRegisteredOnConstruction() {
// First, verify no metrics with our specific tags exist before construction
// (they might exist from other tests, so we'll just check they exist after)

// Create a fresh instance - this should eagerly register the metrics
BrokerTopicMetrics freshAllTopicsStats = new BrokerTopicMetrics(false);

try {
// Check that the classic bytesIn/bytesOut meters (topicType=classic) are registered
assertTrue(isMetricRegistered("BytesInPerSec", "topicType", "classic"),
"BytesInPerSec with topicType=classic should be registered immediately on construction");
assertTrue(isMetricRegistered("BytesOutPerSec", "topicType", "classic"),
"BytesOutPerSec with topicType=classic should be registered immediately on construction");

// Check that the diskless bytesIn/bytesOut meters (topicType=diskless) are registered
// This is the key assertion - these were not being eagerly initialized after aa045fbd
assertTrue(isMetricRegistered("BytesInPerSec", "topicType", "diskless"),
"BytesInPerSec with topicType=diskless should be registered immediately on construction " +
"(regression check for aa045fbd - metrics with only topicType tag must be eagerly initialized)");
assertTrue(isMetricRegistered("BytesOutPerSec", "topicType", "diskless"),
"BytesOutPerSec with topicType=diskless should be registered immediately on construction " +
"(regression check for aa045fbd - metrics with only topicType tag must be eagerly initialized)");
} finally {
freshAllTopicsStats.close();
}
}

/**
* Helper method to check if a metric with given name and tag is registered in the Yammer registry.
*/
private boolean isMetricRegistered(String metricName, String tagKey, String tagValue) {
return KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().stream()
.anyMatch(mn -> mn.getName().equals(metricName)
&& mn.getScope() != null
&& mn.getScope().contains(tagKey + "." + tagValue));
}

/**
* Verifies that topic-specific meters are lazily initialized (only created on first access).
* This is different from all-topics stats because we don't want to create meters for topics
* that may never be accessed.
*/
@Test
public void testTopicSpecificMetersAreLazilyInitialized() {
// For topic-specific stats, meters are created lazily when first accessed.
// We can't directly test "not initialized" without internal access, but we can verify
// that the meter IS created when accessed and remains the same instance.

Meter bytesIn1 = topicSpecificStats.bytesInRate();
Meter bytesIn2 = topicSpecificStats.bytesInRate();
assertSame(bytesIn1, bytesIn2,
"Topic-specific bytesInRate should return same instance after first access");
}

@Test
public void testMultipleTopicSpecificMetricsAreDistinct() {
// Different topics should have different meters (different topic name tags)
Expand Down
Loading