Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add high performance readVectored API #1072

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.util.List;
import java.util.function.IntFunction;
import javax.annotation.Nonnull;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
Expand Down Expand Up @@ -63,6 +66,7 @@ class GoogleHadoopFSInputStream extends FSInputStream implements IOStatisticsSou
private final FileSystem.Statistics statistics;
// Statistic tracker of the Input stream
private final GhfsInputStreamStatistics streamStatistics;
private final VectoredIOImpl vectoredIO;

static GoogleHadoopFSInputStream create(
GoogleHadoopFileSystem ghfs, URI gcsPath, FileSystem.Statistics statistics)
Expand Down Expand Up @@ -94,6 +98,20 @@ private GoogleHadoopFSInputStream(
this.channel = channel;
this.statistics = statistics;
this.streamStatistics = ghfs.getInstrumentation().newInputStreamStatistics(statistics);
this.vectoredIO = VectoredIOImpl.create(ghfs, gcsPath, statistics);
}

/**
* {@inheritDoc} Vectored read implementation for GoogleHadoopFSInputStream.
*
* @param ranges the byte ranges to read.
* @param allocate the function to allocate ByteBuffer.
* @throws IOException IOE if any.
*/
@Override
public void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate)
throws IOException {
vectoredIO.readVectored(ranges, allocate);
}

@Override
Expand Down Expand Up @@ -187,6 +205,7 @@ public synchronized void close() throws IOException {

if (!isClosed) {
streamStatistics.close();
vectoredIO.close();
}
}

Expand Down
195 changes: 195 additions & 0 deletions gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/VectoredIOImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package com.google.cloud.hadoop.fs.gcs;

import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions;
import com.google.common.flogger.GoogleLogger;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.VectoredReadUtils;
import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.util.functional.FutureIO;

public class VectoredIOImpl {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private final FileSystem.Statistics statistics;
private final URI gcsPath;
private final GoogleCloudStorageReadOptions readVectoredOptions;
private final GoogleHadoopFileSystem ghfs;
private final int maxReadVectoredParallelism = 8;
private final int maxReadSizeForVectorReads = 128 * 1024 * 1024;
private final int minSeekForVectorReads = 4 * 1024;
private final ExecutorService boundedThreadPool;

static VectoredIOImpl create(
GoogleHadoopFileSystem ghfs, URI gcsPath, FileSystem.Statistics statistics) {
return new VectoredIOImpl(ghfs, gcsPath, statistics);
}

private VectoredIOImpl(
GoogleHadoopFileSystem ghfs, URI gcsPath, FileSystem.Statistics statistics) {
this.ghfs = ghfs;
this.gcsPath = gcsPath;
this.statistics = statistics;
this.readVectoredOptions =
getReadVectoredOptions(
ghfs.getGcsFs().getOptions().getCloudStorageOptions().getReadChannelOptions());
this.boundedThreadPool = Executors.newFixedThreadPool(maxReadVectoredParallelism);
}

/**
* Reads data from Google Cloud Storage using vectored I/O operations.
*
* @param ranges List of file ranges to read.
* @param allocate Function to allocate ByteBuffer for reading.
* @throws IOException if an I/O error occurs.
*/
public void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate)
throws IOException {

List<? extends FileRange> sortedRanges = validateNonOverlappingAndReturnSortedRanges(ranges);
for (FileRange range : ranges) {
VectoredReadUtils.validateRangeRequest(range);
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
range.setData(result);
}

if (VectoredReadUtils.isOrderedDisjoint(ranges, 1, minSeekForVectorReads)) {
for (FileRange range : sortedRanges) {
ByteBuffer buffer = allocate.apply(range.getLength());
boundedThreadPool.submit(() -> readSingleRange(range, buffer));
statistics.incrementBytesRead(range.getLength());
statistics.incrementReadOps(1);
}
} else {
List<CombinedFileRange> combinedFileRanges =
VectoredReadUtils.mergeSortedRanges(
sortedRanges, 1, minSeekForVectorReads, maxReadSizeForVectorReads);
for (CombinedFileRange combinedFileRange : combinedFileRanges) {
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
combinedFileRange.setData(result);

ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
boundedThreadPool.submit(() -> readSingleRange(combinedFileRange, buffer));
}

for (CombinedFileRange combinedFileRange : combinedFileRanges) {
try {
ByteBuffer combinedBuffer = FutureIO.awaitFuture(combinedFileRange.getData());
for (FileRange child : combinedFileRange.getUnderlying()) {
updateOriginalRange(child, combinedBuffer, combinedFileRange);
}
} catch (Exception ex) {
logger.atSevere().withCause(ex).log(
"Exception occurred while reading combined range from file");
for (FileRange child : combinedFileRange.getUnderlying()) {
child.getData().completeExceptionally(ex);
}
}
statistics.incrementBytesRead(combinedFileRange.getLength());
statistics.incrementReadOps(1);
}
}
}

/**
* Returns modified GoogleCloudStorageReadOptions with vectored read options set.
*
* @param readOptions The original read options.
* @return The modified read options with vectored read options set.
*/
private GoogleCloudStorageReadOptions getReadVectoredOptions(
GoogleCloudStorageReadOptions readOptions) {
GoogleCloudStorageReadOptions.Builder builder = readOptions.toBuilder();
builder.setFastFailOnNotFoundEnabled(false);
builder.setFadvise(GoogleCloudStorageReadOptions.Fadvise.SEQUENTIAL);
return builder.build();
}

/**
* Read data from GCS for this range and populate the buffer.
*
* @param range range of data to read.
* @param buffer buffer to fill.
*/
private void readSingleRange(FileRange range, ByteBuffer buffer) {
try {
VectoredReadUtils.validateRangeRequest(range);
try (SeekableByteChannel channel = ghfs.getGcsFs().open(gcsPath, readVectoredOptions)) {
channel.position(range.getOffset());
int numRead = channel.read(ByteBuffer.wrap(buffer.array(), 0, range.getLength()));
range.getData().complete(buffer);
logger.atInfo().log(
"Read single range completed from offset: %d with a length of %d",
range.getOffset(), numRead);
}
} catch (Exception ex) {
logger.atInfo().withCause(ex).log("Exception while reading a range %s", range.toString());
range.getData().completeExceptionally(ex);
}
}

/**
* Update data in child range from combined range.
*
* @param child child range.
* @param combinedBuffer combined buffer.
* @param combinedFileRange combined range.
*/
private void updateOriginalRange(
FileRange child, ByteBuffer combinedBuffer, CombinedFileRange combinedFileRange) {
ByteBuffer childBuffer =
VectoredReadUtils.sliceTo(combinedBuffer, combinedFileRange.getOffset(), child);
child.getData().complete(childBuffer);
}

/**
* Check if the input ranges are overlapping in nature. We call two ranges to be overlapping when
* start offset of second is less than the end offset of first. End offset is calculated as start
* offset + length.
*
* @param input list if input ranges.
* @return true/false based on logic explained above.
*/
private List<? extends FileRange> validateNonOverlappingAndReturnSortedRanges(
List<? extends FileRange> input) {

if (input.size() <= 1) {
return input;
}
FileRange[] sortedRanges = VectoredReadUtils.sortRanges(input);
FileRange prev = sortedRanges[0];
for (int i = 1; i < sortedRanges.length; i++) {
if (sortedRanges[i].getOffset() < prev.getOffset() + prev.getLength()) {
throw new UnsupportedOperationException("Overlapping ranges are not supported");
}
prev = sortedRanges[i];
}
return Arrays.asList(sortedRanges);
}

/** Closes the VectoredIOImpl instance, releasing any allocated resources. */
public void close() {
boundedThreadPool.shutdown();
try {
if (!boundedThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
logger.atWarning().log(
"Executor did not terminate within timeout. Forcibly shutting down.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
boundedThreadPool.shutdownNow();
}
}
}
Loading