Skip to content

Commit

Permalink
use folly::Expected as return code
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Apr 22, 2024
1 parent 37a319f commit 04becc9
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 205 deletions.
18 changes: 18 additions & 0 deletions velox/common/base/Status.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <fmt/format.h>
#include <fmt/ostream.h>
#include <folly/Expected.h>
#include <folly/Likely.h>
#include <string>
#include <utility>
Expand Down Expand Up @@ -476,6 +477,23 @@ void Status::moveFrom(Status& s) {
VELOX_RETURN_IF(!__s.ok(), __s); \
} while (false)

/// Return with given status wrapped in folly::Unexpected if condition is met.
#define VELOX_RETURN_UNEXPECTED_IF(condition, status) \
do { \
if (FOLLY_UNLIKELY(condition)) { \
return (::folly::makeUnexpected(status)); \
} \
} while (0)

/// Propagate any non-successful Status wrapped in folly::Unexpected to the
/// caller.
#define VELOX_RETURN_UNEXPECTED(status) \
do { \
::facebook::velox::Status __s = \
::facebook::velox::internal::genericToStatus(status); \
VELOX_RETURN_IF(!__s.ok(), ::folly::makeUnexpected(__s)); \
} while (false)

namespace internal {

/// Common API for extracting Status from either Status or Result<T> (the latter
Expand Down
54 changes: 34 additions & 20 deletions velox/common/compression/Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
#pragma once

#include <fmt/format.h>
#include <folly/Expected.h>
#include <folly/compression/Compression.h>
#include <string>

#include "velox/common/base/Status.h"

namespace facebook::velox::common {

enum CompressionKind {
Expand Down Expand Up @@ -72,7 +75,7 @@ class StreamingCompressor {
/// Compress some input.
/// If CompressResult.outputTooSmall is true on return, then a larger output
/// buffer should be supplied.
virtual CompressResult compress(
virtual folly::Expected<CompressResult, Status> compress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
Expand All @@ -81,13 +84,17 @@ class StreamingCompressor {
/// Flush part of the compressed output.
/// If FlushResult.outputTooSmall is true on return, flush() should be called
/// again with a larger buffer.
virtual FlushResult flush(uint8_t* output, uint64_t outputLength) = 0;
virtual folly::Expected<FlushResult, Status> flush(
uint8_t* output,
uint64_t outputLength) = 0;

/// End compressing, doing whatever is necessary to end the stream.
/// If EndResult.outputTooSmall is true on return, end() should be called
/// again with a larger buffer. Otherwise, the StreamingCompressor should not
/// be used anymore. end() will flush the compressed output.
virtual EndResult end(uint8_t* output, uint64_t outputLength) = 0;
virtual folly::Expected<EndResult, Status> end(
uint8_t* output,
uint64_t outputLength) = 0;
};

class StreamingDecompressor {
Expand All @@ -103,7 +110,7 @@ class StreamingDecompressor {
/// Decompress some input.
/// If outputTooSmall is true on return, a larger output buffer needs
/// to be supplied.
virtual DecompressResult decompress(
virtual folly::Expected<DecompressResult, Status> decompress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
Expand All @@ -113,7 +120,7 @@ class StreamingDecompressor {
virtual bool isFinished() = 0;

/// Reinitialize decompressor, making it ready for a new compressed stream.
virtual void reset() = 0;
virtual Status reset() = 0;
};

struct CodecOptions {
Expand Down Expand Up @@ -180,25 +187,25 @@ class Codec {
/// `kUseDefaultCompressionLevel` will be returned.
virtual int32_t defaultCompressionLevel() const = 0;

/// One-shot decompression function.
/// `outputLength` must be correct and therefore be obtained in advance.
/// The actual decompressed length is returned.
/// Note: One-shot decompression is not always compatible with streaming
/// compression. Depending on the codec (e.g. LZ4), different formats may
/// Performs one-shot compression.
/// `outputLength` must first have been computed using maxCompressedLength().
/// The actual compressed length will be written to actualOutputLength.
/// Note: One-shot compression is not always compatible with streaming
/// decompression. Depending on the codec (e.g. LZ4), different formats may
/// be used.
virtual uint64_t decompress(
virtual folly::Expected<uint64_t, Status> compress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
uint64_t outputLength) = 0;

/// Performs one-shot compression.
/// `outputLength` must first have been computed using maxCompressedLength().
/// The actual compressed length is returned.
/// Note: One-shot compression is not always compatible with streaming
/// decompression. Depending on the codec (e.g. LZ4), different formats may
/// One-shot decompression function.
/// `outputLength` must be correct and therefore be obtained in advance.
/// The actual decompressed length is returned.
/// Note: One-shot decompression is not always compatible with streaming
/// compression. Depending on the codec (e.g. LZ4), different formats may
/// be used.
virtual uint64_t compress(
virtual folly::Expected<uint64_t, Status> decompress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
Expand Down Expand Up @@ -231,11 +238,18 @@ class Codec {
const uint8_t* input) const;

/// Create a streaming compressor instance.
virtual std::shared_ptr<StreamingCompressor> makeStreamingCompressor() = 0;
virtual folly::Expected<std::shared_ptr<StreamingCompressor>, Status>
makeStreamingCompressor() {
return folly::makeUnexpected(Status::Invalid(
"Streaming compression is unsupported with {} format.", name()));
}

/// Create a streaming compressor instance.
virtual std::shared_ptr<StreamingDecompressor>
makeStreamingDecompressor() = 0;
virtual folly::Expected<std::shared_ptr<StreamingDecompressor>, Status>
makeStreamingDecompressor() {
return folly::makeUnexpected(Status::Invalid(
"Streaming decompression is unsupported with {} format.", name()));
}

/// This Codec's compression type.
virtual CompressionKind compressionKind() const = 0;
Expand Down
12 changes: 5 additions & 7 deletions velox/common/compression/HadoopCompressionFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,13 @@ bool HadoopCompressionFormat::tryDecompressHadoop(
return false;
}
// Try decompressing and compare with expected decompressed length.
try {
auto decompressedSize = decompressInternal(
input, expectedCompressedSize, output, outputLength);
if (decompressedSize != expectedDecompressedSize) {
return false;
}
} catch (const VeloxException& e) {
auto maybeDecompressedSize =
decompressInternal(input, expectedCompressedSize, output, outputLength);
if (maybeDecompressedSize.hasError() ||
maybeDecompressedSize.value() != expectedDecompressedSize) {
return false;
}

input += expectedCompressedSize;
inputLength -= expectedCompressedSize;
output += expectedDecompressedSize;
Expand Down
4 changes: 3 additions & 1 deletion velox/common/compression/HadoopCompressionFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

#pragma once

#include <folly/Expected.h>
#include <cstdint>
#include "velox/common/base/Status.h"

namespace facebook::velox::common {

Expand All @@ -29,7 +31,7 @@ class HadoopCompressionFormat {
uint64_t outputLength,
uint64_t& actualDecompressedSize);

virtual uint64_t decompressInternal(
virtual folly::Expected<uint64_t, Status> decompressInternal(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
Expand Down
Loading

0 comments on commit 04becc9

Please sign in to comment.