Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Inkless
* Copyright (C) 2024 - 2026 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.cache;

import java.util.Collections;
import java.util.List;
import java.util.Set;

import io.aiven.inkless.common.ByteRange;

/**
* Strategy that collapses all byte ranges into a single bounding range
* from min(offset) to max(offset + size).
*
* <p>Used for the cold (lagging consumer) path where making one HTTP request
* for the bounding range is cheaper than multiple requests to skip small gaps
* of interleaved data from other partitions.
*/
public class BoundingRangeAlignment implements KeyAlignmentStrategy {

@Override
public Set<ByteRange> align(List<ByteRange> ranges) {
if (ranges == null || ranges.isEmpty()) {
return Collections.emptySet();
}

long minOffset = Long.MAX_VALUE;
long maxEnd = Long.MIN_VALUE;

for (final ByteRange range : ranges) {
minOffset = Math.min(minOffset, range.offset());
maxEnd = Math.max(maxEnd, range.offset() + range.size());
}

return Set.of(new ByteRange(minOffset, maxEnd - minOffset));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.stream.Stream;

import io.aiven.inkless.TimeUtils;
import io.aiven.inkless.cache.BoundingRangeAlignment;
import io.aiven.inkless.cache.KeyAlignmentStrategy;
import io.aiven.inkless.cache.ObjectCache;
import io.aiven.inkless.common.ByteRange;
Expand Down Expand Up @@ -61,6 +62,8 @@
*/
public class FetchPlanner implements Supplier<List<FetchPlanner.FetchRequestWithFuture>> {

private static final KeyAlignmentStrategy COLD_PATH_ALIGNMENT = new BoundingRangeAlignment();

private final Time time;
private final ObjectKeyCreator objectKeyCreator;
private final KeyAlignmentStrategy keyAlignment;
Expand Down Expand Up @@ -156,7 +159,9 @@ List<ObjectFetchRequest> planJobs(final Map<TopicIdPartition, FindBatchResponse>

/**
* Creates fetch requests for a single object with multiple batches.
* Aligns byte ranges and aggregates metadata (timestamp for hot/cold path decision).
* Aggregates metadata and selects range strategy based on data recency:
* - Hot path (recent data): aligns ranges for cache efficiency
* - Cold path (lagging consumers): single bounding range to minimize HTTP requests
*/
private Stream<ObjectFetchRequest> createFetchRequests(
final String objectKey,
Expand All @@ -179,36 +184,49 @@ private Stream<ObjectFetchRequest> createFetchRequests(
.map(b -> b.metadata().range())
.collect(Collectors.toList());

// Align byte ranges for efficient fetching
final Set<ByteRange> alignedRanges = keyAlignment.align(byteRanges);
// Compute the lagging decision once during planning so range strategy and execution path stay consistent.
final boolean lagging = isLagging(timestamp);

// Select range strategy based on data recency:
// - Hot path: align to fixed blocks for cache hit rate
// - Cold path: single bounding range to minimize HTTP requests (cache is bypassed anyway)
final Set<ByteRange> fetchRanges = lagging
? COLD_PATH_ALIGNMENT.align(byteRanges)
: keyAlignment.align(byteRanges);
Comment thread
jeqo marked this conversation as resolved.

// Create a fetch request for each aligned range with aggregated metadata
return alignedRanges.stream()
// Create a fetch request for each range with aggregated metadata
return fetchRanges.stream()
.map(byteRange -> new ObjectFetchRequest(
objectKeyCreator.from(objectKey),
byteRange,
timestamp
timestamp,
lagging
));
}

/**
* Determines if data with the given timestamp should use the cold (lagging) path.
* This decision is computed once during planning and carried through via {@link ObjectFetchRequest#lagging()}
* to ensure range strategy (aligned vs bounding) and execution path (cache vs bypass) stay consistent.
*/
private boolean isLagging(final long timestamp) {
final boolean laggingFeatureEnabled = laggingFetchDataExecutor != null && laggingObjectFetcher != null;
if (!laggingFeatureEnabled) {
return false;
}
final long currentTime = time.milliseconds();
final long dataAge = Math.max(0, currentTime - timestamp);
return dataAge > laggingConsumerThresholdMs;
}

private List<FetchRequestWithFuture> submitAllRequests(final List<ObjectFetchRequest> requests) {
return requests.stream()
.map(request -> new FetchRequestWithFuture(request, submitSingleRequest(request)))
.collect(Collectors.toList());
}

private CompletableFuture<FileExtent> submitSingleRequest(final ObjectFetchRequest request) {
final long currentTime = time.milliseconds();
// If timestamp is in the future (clock skew), treat as recent data (hot path)
// Math.max ensures dataAge is never negative, which would incorrectly route future timestamps
final long dataAge = Math.max(0, currentTime - request.timestamp());

// Lagging consumer feature is enabled only when BOTH laggingFetchDataExecutor AND laggingObjectFetcher are non-null.
// If either is null, the feature is disabled and all requests are treated as recent (hot path).
final boolean laggingFeatureEnabled = laggingFetchDataExecutor != null && laggingObjectFetcher != null;
final boolean isLagging = laggingFeatureEnabled && (dataAge > laggingConsumerThresholdMs);

if (!isLagging) {
if (!request.lagging()) {
// Hot path: up-to-date consumers use cache + recentDataExecutor
metrics.recordRecentDataRequest();
return cache.computeIfAbsent(
Expand Down Expand Up @@ -315,14 +333,17 @@ private FileExtent fetchFileExtent(final ObjectFetcher fetcher, final ObjectFetc
*
* @param objectKey the storage object key
* @param byteRange the range of bytes to fetch
* @param timestamp the maximum timestamp from batches (for hot/cold path decision).
* @param timestamp the maximum timestamp from batches.
* Using max instead of min because if ANY batch in the object is recent,
* we treat the entire fetch as hot path to prioritize recent data access.
* @param lagging pre-computed lagging decision from planning phase, ensuring range strategy
* (aligned vs bounding) and execution path (cache vs bypass) stay consistent.
*/
record ObjectFetchRequest(
ObjectKey objectKey,
ByteRange byteRange,
long timestamp
long timestamp,
boolean lagging
) {
/**
* Converts to cache key for deduplication.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Inkless
* Copyright (C) 2024 - 2026 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.cache;

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;
import java.util.Set;

import io.aiven.inkless.common.ByteRange;

import static org.assertj.core.api.Assertions.assertThat;

class BoundingRangeAlignmentTest {

final KeyAlignmentStrategy strategy = new BoundingRangeAlignment();

@Test
void nullInput() {
assertThat(strategy.align(null)).isEmpty();
}

@Test
void emptyInput() {
assertThat(strategy.align(Collections.emptyList())).isEmpty();
}

@Test
void singleRange() {
assertThat(strategy.align(List.of(new ByteRange(10, 20))))
.isEqualTo(Set.of(new ByteRange(10, 20)));
}

@Test
void nonOverlappingRanges() {
// [0,10) and [20,30) — bounding range is [0,30)
assertThat(strategy.align(List.of(
new ByteRange(0, 10),
new ByteRange(20, 10)
))).isEqualTo(Set.of(new ByteRange(0, 30)));
}

@Test
void overlappingRanges() {
// [0,15) and [10,10) — bounding range is [0,20)
assertThat(strategy.align(List.of(
new ByteRange(0, 15),
new ByteRange(10, 10)
))).isEqualTo(Set.of(new ByteRange(0, 20)));
}

@Test
void unsortedInput() {
// Reverse order: [100,150) before [0,20) — bounding range is [0,150)
assertThat(strategy.align(List.of(
new ByteRange(100, 50),
new ByteRange(0, 20)
))).isEqualTo(Set.of(new ByteRange(0, 150)));
}
}
Loading
Loading