Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1894,9 +1894,10 @@ class ReplicaManager(val config: KafkaConfig,
classicFetchInfos += fetchInfo
} else {
val classicToDisklessStartOffset = _inklessMetadataView.getClassicToDisklessStartOffset(tp.topicPartition())
val shouldReadFromUnifiedLog =
classicToDisklessStartOffset != PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET &&
partitionData.fetchOffset < classicToDisklessStartOffset
val migrationPending =
classicToDisklessStartOffset == PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING
val shouldReadFromUnifiedLog = migrationPending || (
classicToDisklessStartOffset >= 0 && partitionData.fetchOffset < classicToDisklessStartOffset)

(shouldReadFromUnifiedLog, config.disklessManagedReplicasEnabled) match {
case (false, _) =>
Expand Down Expand Up @@ -2850,7 +2851,7 @@ class ReplicaManager(val config: KafkaConfig,
Option(topicImage.partitions().get(partitionId))
}
val shouldInitOnControlPlane = previousPartition.exists { previous =>
previous.classicToDisklessStartOffset == PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET &&
previous.classicToDisklessStartOffset < 0 &&
partitionRegistration.classicToDisklessStartOffset >= 0
}

Expand Down
142 changes: 142 additions & 0 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6427,6 +6427,148 @@ class ReplicaManagerTest {
}
}

@Test
def testFetchDisklessMigrationPendingReadsFromClassicLogWhenManagedReplicasEnabled(): Unit = {
val fetchHandlerCtor = mockFetchHandler(Map.empty)
try {
val cp = mock(classOf[ControlPlane])
val replicaManager = spy(createReplicaManager(List(disklessTopicPartition.topic()), controlPlane = Some(cp), disklessManagedReplicasEnabled = true))

// Given a diskless topic with classicToDisklessStartOffset = -2 (migration pending)
when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition()))
.thenReturn(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING)

doReturn(Seq(disklessTopicPartition ->
new LogReadResult(
new FetchDataInfo(new LogOffsetMetadata(1L, 0L, 0), RECORDS),
Optional.empty(), 10L, 0L, 10L, 0L, 0L, OptionalLong.empty(), Errors.NONE
))
).when(replicaManager).readFromLog(any(), any(), any(), any())

val fetchParams = new FetchParams(
-1, -1L,
0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()
)
val fetchInfos = Seq(
disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty())
)

@volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null
val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => {
responseData = response.toMap
}
replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback)

// Then the request is served from the unified log (classic path) because migration is still pending
assertNotNull(responseData)
assertEquals(1, responseData.size)
assertEquals(RECORDS, responseData(disklessTopicPartition).records)
verify(replicaManager, times(1)).readFromLog(any(), any(), any(), any())
verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any())
verify(cp, never()).findBatches(any(), any(), any())
} finally {
fetchHandlerCtor.close()
}
}

@Test
def testFetchDisklessMigrationPendingFailsWhenManagedReplicasDisabled(): Unit = {
val fetchHandlerCtor = mockFetchHandler(Map.empty)
try {
val cp = mock(classOf[ControlPlane])
val replicaManager = spy(createReplicaManager(
List(disklessTopicPartition.topic()),
controlPlane = Some(cp),
disklessManagedReplicasEnabled = false,
))

// Given a diskless topic with classicToDisklessStartOffset = -2 (migration pending)
when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition()))
.thenReturn(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING)

val fetchParams = new FetchParams(
-1, -1L,
0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()
)
val fetchInfos = Seq(
disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty())
)

@volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null
val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => {
responseData = response.toMap
}
replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback)

// Then the request fails because managed replicas are disabled and data is still in UnifiedLog
assertNotNull(responseData)
assertEquals(1, responseData.size)
assertEquals(Errors.INVALID_REQUEST, responseData(disklessTopicPartition).error)
verify(replicaManager, never()).readFromLog(any(), any(), any(), any())
verify(fetchHandlerCtor.constructed().get(0), never()).handle(any(), any())
verify(cp, never()).findBatches(any(), any(), any())
} finally {
fetchHandlerCtor.close()
}
}

@Test
def testFetchFullDisklessTopicRoutesDiskless(): Unit = {
val disklessResponse = Map(disklessTopicPartition ->
new FetchPartitionData(
Errors.NONE,
110L, 100L,
RECORDS,
Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)
)
val fetchHandlerCtor = mockFetchHandler(disklessResponse)
try {
val batchMetadata = mock(classOf[BatchMetadata])
when(batchMetadata.topicIdPartition()).thenReturn(disklessTopicPartition)
val batch = mock(classOf[BatchInfo])
when(batch.metadata()).thenReturn(batchMetadata)
val findBatchResponse = mock(classOf[FindBatchResponse])
when(findBatchResponse.batches()).thenReturn(util.List.of(batch))
when(findBatchResponse.highWatermark()).thenReturn(110L)
when(findBatchResponse.estimatedByteSize(50L)).thenReturn(RECORDS.sizeInBytes())
when(findBatchResponse.errors()).thenReturn(Errors.NONE)
val cp = mock(classOf[ControlPlane])
when(cp.findBatches(any(), any(), any())).thenReturn(util.List.of(findBatchResponse))
val replicaManager = spy(createReplicaManager(
List(disklessTopicPartition.topic()),
controlPlane = Some(cp),
disklessManagedReplicasEnabled = true,
))

// Given a full diskless topic with classicToDisklessStartOffset = -1 (never migrated)
when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition()))
.thenReturn(PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET)

val fetchParams = new FetchParams(
-1, -1L,
0L, 1, 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()
)
val fetchInfos = Seq(
disklessTopicPartition -> new PartitionData(disklessTopicPartition.topicId(), 50L, 0L, 1024, Optional.empty())
)

@volatile var responseData: Map[TopicIdPartition, FetchPartitionData] = null
val responseCallback = (response: Seq[(TopicIdPartition, FetchPartitionData)]) => {
responseData = response.toMap
}
replicaManager.fetchMessages(fetchParams, fetchInfos, QuotaFactory.UNBOUNDED_QUOTA, responseCallback)

// Then the request is served from diskless path (not from UnifiedLog)
assertNotNull(responseData)
assertEquals(1, responseData.size)
assertEquals(Errors.NONE, responseData(disklessTopicPartition).error)
verify(replicaManager, never()).readFromLog(any(), any(), any(), any())
verify(fetchHandlerCtor.constructed().get(0), times(1)).handle(any(), any())
} finally {
fetchHandlerCtor.close()
}
}

@Test
def testFetchFailDisklessWhenFromReplicaAndUnmanagedReplicas(): Unit = {
val fetchHandlerCtor = mockFetchHandler(Map.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.snapshot.SnapshotReader;
Expand Down Expand Up @@ -1929,6 +1930,14 @@ public CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
if (validateOnly) {
return result.withoutRecords();
} else {
List<ApiMessageAndVersion> migrationRecords =
replicationControl.markClassicToDisklessMigrationStarted(configChanges, result.response());
if (!migrationRecords.isEmpty()) {
List<ApiMessageAndVersion> allRecords = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
allRecords.addAll(result.records());
allRecords.addAll(migrationRecords);
return ControllerResult.of(allRecords, result.response());
Comment thread
giuseppelillo marked this conversation as resolved.
}
return result;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,7 @@ ControllerResult<InitDisklessLogResponseData> initDisklessLog(
continue;
}

if (partition.classicToDisklessStartOffset != PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET) {
if (partition.classicToDisklessStartOffset >= 0) {
log.info("Rejecting InitDisklessLog request from node {} for {}-{} because " +
"the partition is already initialized with classicToDisklessStartOffset={}.",
request.brokerId(), topic.name, partitionId, partition.classicToDisklessStartOffset);
Expand Down Expand Up @@ -2875,6 +2875,47 @@ private void validatePartitionReplicationFactorUnchanged(PartitionRegistration p
}
}

List<ApiMessageAndVersion> markClassicToDisklessMigrationStarted(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have some tests for this one?

Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges,
Map<ConfigResource, ApiError> configResults
) {
List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> entry : configChanges.entrySet()) {
ConfigResource resource = entry.getKey();
if (resource.type() != TOPIC) continue;
ApiError error = configResults.get(resource);
if (error != null && error != ApiError.NONE) continue;
Map<String, Entry<OpType, String>> changes = entry.getValue();
Entry<OpType, String> disklessChange = changes.get(DISKLESS_ENABLE_CONFIG);
if (disklessChange == null) continue;
if (disklessChange.getKey() != SET || !Boolean.parseBoolean(disklessChange.getValue())) continue;
if (isDisklessTopic(resource.name())) continue;

String topicName = resource.name();
Uuid topicId = topicsByName.get(topicName);
if (topicId == null) continue;
TopicControlInfo topicInfo = topics.get(topicId);
if (topicInfo == null) continue;

int sizeBefore = records.size();
for (Entry<Integer, PartitionRegistration> partEntry : topicInfo.parts.entrySet()) {
PartitionRegistration partition = partEntry.getValue();
if (partition.classicToDisklessStartOffset == PartitionRegistration.NO_CLASSIC_TO_DISKLESS_START_OFFSET) {
PartitionChangeRecord record = new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(partEntry.getKey());
record.unknownTaggedFields().add(
InitDisklessLogFields.encodeClassicToDisklessStartOffset(
PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING));
records.add(new ApiMessageAndVersion(record, (short) 0));
}
}
log.info("Marked {} partition(s) for topic {} as classic-to-diskless migration pending",
records.size() - sizeBefore, topicName);
}
Comment thread
giuseppelillo marked this conversation as resolved.
return records;
}

private boolean isDisklessTopic(String topicName) {
return Boolean.parseBoolean(
configurationControl.currentTopicConfig(topicName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public PartitionRegistration build() {
}

public static final long NO_CLASSIC_TO_DISKLESS_START_OFFSET = -1L;
public static final long CLASSIC_TO_DISKLESS_MIGRATION_PENDING = -2L;

public final int[] replicas;
public final Uuid[] directories;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6119,6 +6119,80 @@ public void testInitDisklessLogMultipleProducerStates() {
assertEquals(2L, updatedPartition.disklessProducerStates.get(1).producerId());
assertEquals(3L, updatedPartition.disklessProducerStates.get(2).producerId());
}

@Test
public void testInitDisklessLogAcceptsMigrationPendingPartition() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2}});

Uuid topicId = createTopicResult.topicId();

// Simulate migration pending by replaying a PartitionChangeRecord with -2
PartitionChangeRecord migrationPendingRecord = new PartitionChangeRecord()
.setTopicId(topicId)
.setPartitionId(0);
migrationPendingRecord.unknownTaggedFields().add(
InitDisklessLogFields.encodeClassicToDisklessStartOffset(
PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING));
ctx.replay(List.of(new ApiMessageAndVersion(migrationPendingRecord, (short) 0)));

PartitionRegistration pendingPartition = replicationControl.getPartition(topicId, 0);
assertEquals(PartitionRegistration.CLASSIC_TO_DISKLESS_MIGRATION_PENDING, pendingPartition.classicToDisklessStartOffset);

// InitDisklessLog should succeed even though classicToDisklessStartOffset is -2
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.ALTER_PARTITION);
InitDisklessLogRequestData request = singlePartitionRequest(
0, defaultBrokerEpoch(0), topicId, 0, 100L, pendingPartition.leaderEpoch, List.of());

ControllerResult<InitDisklessLogResponseData> result =
replicationControl.initDisklessLog(requestContext, request);

assertEquals(1, result.records().size());
assertEquals(NONE.code(),
result.response().topics().get(0).partitions().get(0).errorCode());

ctx.replay(result.records());
PartitionRegistration updatedPartition = replicationControl.getPartition(topicId, 0);
assertEquals(100L, updatedPartition.classicToDisklessStartOffset);
}

@Test
public void testInitDisklessLogRejectsAlreadyInitializedPartition() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between this test and testInitDisklessLogAlreadyInitializedPartitionRejected?

ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
new int[][] {new int[] {0, 1, 2}});

Uuid topicId = createTopicResult.topicId();
PartitionRegistration partition = replicationControl.getPartition(topicId, 0);

ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.ALTER_PARTITION);

// First init succeeds
InitDisklessLogRequestData firstRequest = singlePartitionRequest(
0, defaultBrokerEpoch(0), topicId, 0, 100L, partition.leaderEpoch, List.of());
ControllerResult<InitDisklessLogResponseData> firstResult =
replicationControl.initDisklessLog(requestContext, firstRequest);
ctx.replay(firstResult.records());
assertEquals(NONE.code(),
firstResult.response().topics().get(0).partitions().get(0).errorCode());

// Second init with different offset is rejected (offset >= 0 means already committed)
PartitionRegistration updatedPartition = replicationControl.getPartition(topicId, 0);
InitDisklessLogRequestData secondRequest = singlePartitionRequest(
0, defaultBrokerEpoch(0), topicId, 0, 200L, updatedPartition.leaderEpoch, List.of());
ControllerResult<InitDisklessLogResponseData> secondResult =
replicationControl.initDisklessLog(requestContext, secondRequest);
assertEquals(0, secondResult.records().size());
assertEquals(INVALID_REQUEST.code(),
secondResult.response().topics().get(0).partitions().get(0).errorCode());
}
}

}
Loading
Loading