Skip to content

feat(inkless): Set migrating partitions in KRaft metadata#580

Draft
giuseppelillo wants to merge 1 commit intomainfrom
giuseppelillo/fix-fetch-routing-diskless
Draft

feat(inkless): Set migrating partitions in KRaft metadata#580
giuseppelillo wants to merge 1 commit intomainfrom
giuseppelillo/fix-fetch-routing-diskless

Conversation

@giuseppelillo
Copy link
Copy Markdown
Contributor

No description provided.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a “migration pending” state for classic→diskless partition migration and wires it through controller metadata updates and broker fetch behavior so partitions can be marked pending in KRaft metadata before the actual diskless start offset is committed.

Changes:

  • Introduce CLASSIC_TO_DISKLESS_MIGRATION_PENDING = -2 as a new sentinel for classicToDisklessStartOffset.
  • Accept classicToDisklessStartOffset = -2 in InitDisklessLog and add controller logic to mark partitions pending when enabling diskless via incremental alter configs.
  • Update ReplicaManager fetch routing and add unit tests covering pending-state round trips, merges, and fetch/init behavior.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java Adds a new sentinel constant for “migration pending”.
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java Treats non-negative offsets as “already initialized” and introduces a helper to mark partitions migration-pending.
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java Appends migration-pending partition records during incrementalAlterConfigs when diskless is enabled.
core/src/main/scala/kafka/server/ReplicaManager.scala Routes fetches to UnifiedLog when migration is pending and adjusts control-plane init trigger to handle any negative→nonnegative transition.
metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java Adds tests for pending-state record round-trip and merge semantics.
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java Adds tests ensuring InitDisklessLog accepts pending partitions and rejects already-initialized partitions.
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala Adds fetch behavior tests for migration-pending partitions under managed/unmanaged replicas.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

List<ApiMessageAndVersion> migrationRecords =
replicationControl.markClassicToDisklessMigrationStarted(configChanges);
if (!migrationRecords.isEmpty()) {
List<ApiMessageAndVersion> allRecords = new ArrayList<>(result.records());
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The controller-wide MAX_RECORDS_PER_USER_OP limit is enforced in other managers by using BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP), but here allRecords is a plain ArrayList and can grow beyond the bound when adding per-partition migration records (e.g., large topics). This risks very large metadata batches / OOM and bypasses the intended user-op record cap. Consider using a bounded list (or explicitly validating result.records().size() + migrationRecords.size() against MAX_RECORDS_PER_USER_OP) and failing the request if it exceeds the limit.

Suggested change
List<ApiMessageAndVersion> allRecords = new ArrayList<>(result.records());
int totalRecords = result.records().size() + migrationRecords.size();
if (totalRecords > MAX_RECORDS_PER_USER_OP) {
throw new InvalidRequestException("incrementalAlterConfigs generated " +
totalRecords + " metadata records, which exceeds the maximum of " +
MAX_RECORDS_PER_USER_OP + " records per user operation.");
}
List<ApiMessageAndVersion> allRecords = new ArrayList<>(totalRecords);
allRecords.addAll(result.records());

Copilot uses AI. Check for mistakes.
Comment on lines +2881 to +2888
List<ApiMessageAndVersion> records = new ArrayList<>();
for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> entry : configChanges.entrySet()) {
ConfigResource resource = entry.getKey();
if (resource.type() != TOPIC) continue;
Map<String, Entry<OpType, String>> changes = entry.getValue();
Entry<OpType, String> disklessChange = changes.get(DISKLESS_ENABLE_CONFIG);
if (disklessChange == null) continue;
if (disklessChange.getKey() != SET || !Boolean.parseBoolean(disklessChange.getValue())) continue;
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

markClassicToDisklessMigrationStarted builds an unbounded ArrayList of records. Enabling diskless on a large topic can add one PartitionChangeRecord per partition, potentially exceeding MAX_RECORDS_PER_USER_OP and producing huge controller batches. Consider using BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP) (or otherwise enforcing a strict upper bound / splitting strategy) so this path can’t generate unbounded record counts.

Copilot uses AI. Check for mistakes.
Comment on lines +2909 to +2911
log.info("Marked {} partition(s) for topic {} as classic-to-diskless migration pending",
records.size(), topicName);
}
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log message uses records.size() which is cumulative across all processed topics in configChanges, so when multiple topics are updated in one request it will log the wrong per-topic partition count. Track a per-topic count (or compute the delta before/after processing the topic) so the log reflects how many partitions were marked pending for that specific topic.

Copilot uses AI. Check for mistakes.
Comment on lines +1933 to +1938
List<ApiMessageAndVersion> migrationRecords =
replicationControl.markClassicToDisklessMigrationStarted(configChanges);
if (!migrationRecords.isEmpty()) {
List<ApiMessageAndVersion> allRecords = new ArrayList<>(result.records());
allRecords.addAll(migrationRecords);
return ControllerResult.of(allRecords, result.response());
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

markClassicToDisklessMigrationStarted is called with the original configChanges map, but incrementalAlterConfigs can partially fail per resource. This can generate PartitionChangeRecords that mark partitions as migration-pending even when the DISKLESS_ENABLE_CONFIG update for that topic was rejected (non-NONE ApiError), leaving metadata inconsistent. Consider filtering to only topics whose config update succeeded (ApiError.NONE) before generating migration records, or passing the successful resources/results into markClassicToDisklessMigrationStarted.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants