feat(inkless): Set migrating partitions in KRaft metadata#580
feat(inkless): Set migrating partitions in KRaft metadata#580giuseppelillo wants to merge 1 commit intomainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a “migration pending” state for classic→diskless partition migration and wires it through controller metadata updates and broker fetch behavior so partitions can be marked pending in KRaft metadata before the actual diskless start offset is committed.
Changes:
- Introduce
CLASSIC_TO_DISKLESS_MIGRATION_PENDING = -2as a new sentinel forclassicToDisklessStartOffset. - Accept
classicToDisklessStartOffset = -2inInitDisklessLogand add controller logic to mark partitions pending when enabling diskless via incremental alter configs. - Update ReplicaManager fetch routing and add unit tests covering pending-state round trips, merges, and fetch/init behavior.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java | Adds a new sentinel constant for “migration pending”. |
| metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java | Treats non-negative offsets as “already initialized” and introduces a helper to mark partitions migration-pending. |
| metadata/src/main/java/org/apache/kafka/controller/QuorumController.java | Appends migration-pending partition records during incrementalAlterConfigs when diskless is enabled. |
| core/src/main/scala/kafka/server/ReplicaManager.scala | Routes fetches to UnifiedLog when migration is pending and adjusts control-plane init trigger to handle any negative→nonnegative transition. |
| metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java | Adds tests for pending-state record round-trip and merge semantics. |
| metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java | Adds tests ensuring InitDisklessLog accepts pending partitions and rejects already-initialized partitions. |
| core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala | Adds fetch behavior tests for migration-pending partitions under managed/unmanaged replicas. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| List<ApiMessageAndVersion> migrationRecords = | ||
| replicationControl.markClassicToDisklessMigrationStarted(configChanges); | ||
| if (!migrationRecords.isEmpty()) { | ||
| List<ApiMessageAndVersion> allRecords = new ArrayList<>(result.records()); |
There was a problem hiding this comment.
The controller-wide MAX_RECORDS_PER_USER_OP limit is enforced in other managers by using BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP), but here allRecords is a plain ArrayList and can grow beyond the bound when adding per-partition migration records (e.g., large topics). This risks very large metadata batches / OOM and bypasses the intended user-op record cap. Consider using a bounded list (or explicitly validating result.records().size() + migrationRecords.size() against MAX_RECORDS_PER_USER_OP) and failing the request if it exceeds the limit.
| List<ApiMessageAndVersion> allRecords = new ArrayList<>(result.records()); | |
| int totalRecords = result.records().size() + migrationRecords.size(); | |
| if (totalRecords > MAX_RECORDS_PER_USER_OP) { | |
| throw new InvalidRequestException("incrementalAlterConfigs generated " + | |
| totalRecords + " metadata records, which exceeds the maximum of " + | |
| MAX_RECORDS_PER_USER_OP + " records per user operation."); | |
| } | |
| List<ApiMessageAndVersion> allRecords = new ArrayList<>(totalRecords); | |
| allRecords.addAll(result.records()); |
| List<ApiMessageAndVersion> records = new ArrayList<>(); | ||
| for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> entry : configChanges.entrySet()) { | ||
| ConfigResource resource = entry.getKey(); | ||
| if (resource.type() != TOPIC) continue; | ||
| Map<String, Entry<OpType, String>> changes = entry.getValue(); | ||
| Entry<OpType, String> disklessChange = changes.get(DISKLESS_ENABLE_CONFIG); | ||
| if (disklessChange == null) continue; | ||
| if (disklessChange.getKey() != SET || !Boolean.parseBoolean(disklessChange.getValue())) continue; |
There was a problem hiding this comment.
markClassicToDisklessMigrationStarted builds an unbounded ArrayList of records. Enabling diskless on a large topic can add one PartitionChangeRecord per partition, potentially exceeding MAX_RECORDS_PER_USER_OP and producing huge controller batches. Consider using BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP) (or otherwise enforcing a strict upper bound / splitting strategy) so this path can’t generate unbounded record counts.
| log.info("Marked {} partition(s) for topic {} as classic-to-diskless migration pending", | ||
| records.size(), topicName); | ||
| } |
There was a problem hiding this comment.
The log message uses records.size() which is cumulative across all processed topics in configChanges, so when multiple topics are updated in one request it will log the wrong per-topic partition count. Track a per-topic count (or compute the delta before/after processing the topic) so the log reflects how many partitions were marked pending for that specific topic.
| List<ApiMessageAndVersion> migrationRecords = | ||
| replicationControl.markClassicToDisklessMigrationStarted(configChanges); | ||
| if (!migrationRecords.isEmpty()) { | ||
| List<ApiMessageAndVersion> allRecords = new ArrayList<>(result.records()); | ||
| allRecords.addAll(migrationRecords); | ||
| return ControllerResult.of(allRecords, result.response()); |
There was a problem hiding this comment.
markClassicToDisklessMigrationStarted is called with the original configChanges map, but incrementalAlterConfigs can partially fail per resource. This can generate PartitionChangeRecords that mark partitions as migration-pending even when the DISKLESS_ENABLE_CONFIG update for that topic was rejected (non-NONE ApiError), leaving metadata inconsistent. Consider filtering to only topics whose config update succeeded (ApiError.NONE) before generating migration records, or passing the successful resources/results into markClassicToDisklessMigrationStarted.
No description provided.