Merge changes I69f41b23,Ib740de5e

* changes:
  libsnapshot: Replace IByteSink usage in inspect_cow.
  libsnapshot: Deprecate the IByteSink API.
This commit is contained in:
David Anderson 2023-04-27 14:28:28 +00:00 committed by Gerrit Code Review
commit cd21e051bd
9 changed files with 513 additions and 189 deletions

View file

@ -15,7 +15,9 @@
#pragma once
#include <stdint.h>
#include <string>
#include <optional>
#include <string_view>
namespace android {
namespace snapshot {
@ -196,5 +198,8 @@ bool IsMetadataOp(const CowOperation& op);
// Ops that have dependencies on old blocks, and must take care in their merge order
bool IsOrderedOp(const CowOperation& op);
// Convert compression name to internal value.
std::optional<CowCompressionAlgorithm> CompressionAlgorithmFromString(std::string_view name);
} // namespace snapshot
} // namespace android

View file

@ -65,6 +65,7 @@ class ICowReader {
// Return the file header.
virtual bool GetHeader(CowHeader* header) = 0;
virtual CowHeader& GetHeader() = 0;
// Return the file footer.
virtual bool GetFooter(CowFooter* footer) = 0;
@ -85,6 +86,19 @@ class ICowReader {
// Get decoded bytes from the data section, handling any decompression.
// All retrieved data is passed to the sink.
virtual bool ReadData(const CowOperation& op, IByteSink* sink) = 0;
// Get decoded bytes from the data section, handling any decompression.
//
// If ignore_bytes is non-zero, it specifies the initial number of bytes
// to skip writing to |buffer|.
//
// Returns the number of bytes written to |buffer|, or -1 on failure.
// errno is NOT set.
//
// Partial reads are not possible unless |buffer_size| is less than the
// operation block size.
virtual ssize_t ReadData(const CowOperation& op, void* buffer, size_t buffer_size,
size_t ignore_bytes = 0) = 0;
};
// Iterate over a sequence of COW operations.
@ -140,6 +154,10 @@ class CowReader final : public ICowReader {
std::unique_ptr<ICowOpIter> GetMergeOpIter(bool ignore_progress = false) override;
bool ReadData(const CowOperation& op, IByteSink* sink) override;
ssize_t ReadData(const CowOperation& op, void* buffer, size_t buffer_size,
size_t ignore_bytes = 0) override;
CowHeader& GetHeader() override { return header_; }
bool GetRawBytes(uint64_t offset, void* buffer, size_t len, size_t* read);

View file

@ -24,6 +24,7 @@
#include <gtest/gtest.h>
#include <libsnapshot/cow_reader.h>
#include <libsnapshot/cow_writer.h>
#include "cow_decompress.h"
using testing::AssertionFailure;
using testing::AssertionResult;
@ -44,23 +45,10 @@ class CowTest : public ::testing::Test {
std::unique_ptr<TemporaryFile> cow_;
};
// Sink that always appends to the end of a string.
class StringSink : public IByteSink {
public:
void* GetBuffer(size_t requested, size_t* actual) override {
size_t old_size = stream_.size();
stream_.resize(old_size + requested, '\0');
*actual = requested;
return stream_.data() + old_size;
}
bool ReturnData(void*, size_t) override { return true; }
void Reset() { stream_.clear(); }
std::string& stream() { return stream_; }
private:
std::string stream_;
};
// Helper to check read sizes.
static inline bool ReadData(CowReader& reader, const CowOperation& op, void* buffer, size_t size) {
return reader.ReadData(op, buffer, size) == size;
}
TEST_F(CowTest, CopyContiguous) {
CowOptions options;
@ -144,7 +132,7 @@ TEST_F(CowTest, ReadWrite) {
ASSERT_EQ(op->new_block, 10);
ASSERT_EQ(op->source, 20);
StringSink sink;
std::string sink(data.size(), '\0');
iter->Next();
ASSERT_FALSE(iter->Done());
@ -154,8 +142,8 @@ TEST_F(CowTest, ReadWrite) {
ASSERT_EQ(op->compression, kCowCompressNone);
ASSERT_EQ(op->data_length, 4096);
ASSERT_EQ(op->new_block, 50);
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_EQ(sink.stream(), data);
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data);
iter->Next();
ASSERT_FALSE(iter->Done());
@ -222,7 +210,7 @@ TEST_F(CowTest, ReadWriteXor) {
ASSERT_EQ(op->new_block, 10);
ASSERT_EQ(op->source, 20);
StringSink sink;
std::string sink(data.size(), '\0');
iter->Next();
ASSERT_FALSE(iter->Done());
@ -233,8 +221,8 @@ TEST_F(CowTest, ReadWriteXor) {
ASSERT_EQ(op->data_length, 4096);
ASSERT_EQ(op->new_block, 50);
ASSERT_EQ(op->source, 98314); // 4096 * 24 + 10
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_EQ(sink.stream(), data);
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data);
iter->Next();
ASSERT_FALSE(iter->Done());
@ -285,22 +273,22 @@ TEST_F(CowTest, CompressGz) {
ASSERT_FALSE(iter->Done());
auto op = &iter->Get();
StringSink sink;
std::string sink(data.size(), '\0');
ASSERT_EQ(op->type, kCowReplaceOp);
ASSERT_EQ(op->compression, kCowCompressGz);
ASSERT_EQ(op->data_length, 56); // compressed!
ASSERT_EQ(op->new_block, 50);
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_EQ(sink.stream(), data);
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data);
iter->Next();
ASSERT_TRUE(iter->Done());
}
class CompressionRWTest : public CowTest, public testing::WithParamInterface<const char*> {};
class CompressionTest : public CowTest, public testing::WithParamInterface<const char*> {};
TEST_P(CompressionRWTest, ThreadedBatchWrites) {
TEST_P(CompressionTest, ThreadedBatchWrites) {
CowOptions options;
options.compression = GetParam();
options.num_compress_threads = 2;
@ -342,31 +330,32 @@ TEST_P(CompressionRWTest, ThreadedBatchWrites) {
if (op->type == kCowXorOp) {
total_blocks += 1;
StringSink sink;
std::string sink(xor_data.size(), '\0');
ASSERT_EQ(op->new_block, 50);
ASSERT_EQ(op->source, 98314); // 4096 * 24 + 10
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_EQ(sink.stream(), xor_data);
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, xor_data);
}
if (op->type == kCowReplaceOp) {
total_blocks += 1;
if (op->new_block == 100) {
StringSink sink;
ASSERT_TRUE(reader.ReadData(*op, &sink));
data.resize(options.block_size);
ASSERT_EQ(sink.stream(), data);
std::string sink(data.size(), '\0');
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink.size(), data.size());
ASSERT_EQ(sink, data);
}
if (op->new_block == 6000) {
StringSink sink;
ASSERT_TRUE(reader.ReadData(*op, &sink));
data2.resize(options.block_size);
ASSERT_EQ(sink.stream(), data2);
std::string sink(data2.size(), '\0');
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data2);
}
if (op->new_block == 9000) {
StringSink sink;
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_EQ(sink.stream(), data3);
std::string sink(data3.size(), '\0');
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data3);
}
}
@ -376,7 +365,7 @@ TEST_P(CompressionRWTest, ThreadedBatchWrites) {
ASSERT_EQ(total_blocks, expected_blocks);
}
TEST_P(CompressionRWTest, NoBatchWrites) {
TEST_P(CompressionTest, NoBatchWrites) {
CowOptions options;
options.compression = GetParam();
options.num_compress_threads = 1;
@ -416,21 +405,21 @@ TEST_P(CompressionRWTest, NoBatchWrites) {
if (op->type == kCowReplaceOp) {
total_blocks += 1;
if (op->new_block == 50) {
StringSink sink;
ASSERT_TRUE(reader.ReadData(*op, &sink));
data.resize(options.block_size);
ASSERT_EQ(sink.stream(), data);
std::string sink(data.size(), '\0');
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data);
}
if (op->new_block == 3000) {
StringSink sink;
ASSERT_TRUE(reader.ReadData(*op, &sink));
data2.resize(options.block_size);
ASSERT_EQ(sink.stream(), data2);
std::string sink(data2.size(), '\0');
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data2);
}
if (op->new_block == 5000) {
StringSink sink;
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_EQ(sink.stream(), data3);
std::string sink(data3.size(), '\0');
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data3);
}
}
@ -440,7 +429,66 @@ TEST_P(CompressionRWTest, NoBatchWrites) {
ASSERT_EQ(total_blocks, expected_blocks);
}
INSTANTIATE_TEST_SUITE_P(CowApi, CompressionRWTest, testing::Values("none", "gz", "brotli", "lz4"));
template <typename T>
class HorribleStream : public IByteStream {
public:
HorribleStream(const std::basic_string<T>& input) : input_(input) {}
ssize_t Read(void* buffer, size_t length) override {
if (pos_ >= input_.size()) {
return 0;
}
if (length) {
*reinterpret_cast<char*>(buffer) = input_[pos_];
}
pos_++;
return 1;
}
size_t Size() const override { return input_.size(); }
private:
std::basic_string<T> input_;
size_t pos_ = 0;
};
TEST(HorribleStream, ReadFully) {
std::string expected = "this is some data";
HorribleStream<char> stream(expected);
std::string buffer(expected.size(), '\0');
ASSERT_TRUE(stream.ReadFully(buffer.data(), buffer.size()));
ASSERT_EQ(buffer, expected);
}
TEST_P(CompressionTest, HorribleStream) {
if (strcmp(GetParam(), "none") == 0) {
GTEST_SKIP();
}
auto algorithm = CompressionAlgorithmFromString(GetParam());
ASSERT_TRUE(algorithm.has_value());
std::string expected = "The quick brown fox jumps over the lazy dog.";
expected.resize(4096, '\0');
auto result = CompressWorker::Compress(*algorithm, expected.data(), expected.size());
ASSERT_FALSE(result.empty());
HorribleStream<uint8_t> stream(result);
auto decomp = IDecompressor::FromString(GetParam());
ASSERT_NE(decomp, nullptr);
decomp->set_stream(&stream);
expected = expected.substr(10, 500);
std::string buffer(expected.size(), '\0');
ASSERT_EQ(decomp->Decompress(buffer.data(), 500, 4096, 10), 500);
ASSERT_EQ(buffer, expected);
}
INSTANTIATE_TEST_SUITE_P(AllCompressors, CompressionTest,
testing::Values("none", "gz", "brotli", "lz4"));
TEST_F(CowTest, ClusterCompressGz) {
CowOptions options;
@ -470,14 +518,14 @@ TEST_F(CowTest, ClusterCompressGz) {
ASSERT_FALSE(iter->Done());
auto op = &iter->Get();
StringSink sink;
std::string sink(data.size(), '\0');
ASSERT_EQ(op->type, kCowReplaceOp);
ASSERT_EQ(op->compression, kCowCompressGz);
ASSERT_EQ(op->data_length, 56); // compressed!
ASSERT_EQ(op->new_block, 50);
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_EQ(sink.stream(), data);
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data);
iter->Next();
ASSERT_FALSE(iter->Done());
@ -489,12 +537,13 @@ TEST_F(CowTest, ClusterCompressGz) {
ASSERT_FALSE(iter->Done());
op = &iter->Get();
sink.Reset();
sink = {};
sink.resize(data2.size(), '\0');
ASSERT_EQ(op->compression, kCowCompressGz);
ASSERT_EQ(op->data_length, 41); // compressed!
ASSERT_EQ(op->new_block, 51);
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_EQ(sink.stream(), data2);
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data2);
iter->Next();
ASSERT_FALSE(iter->Done());
@ -531,55 +580,15 @@ TEST_F(CowTest, CompressTwoBlocks) {
iter->Next();
ASSERT_FALSE(iter->Done());
StringSink sink;
std::string sink(options.block_size, '\0');
auto op = &iter->Get();
ASSERT_EQ(op->type, kCowReplaceOp);
ASSERT_EQ(op->compression, kCowCompressGz);
ASSERT_EQ(op->new_block, 51);
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
}
// Only return 1-byte buffers, to stress test the partial read logic in
// CowReader.
class HorribleStringSink : public StringSink {
public:
void* GetBuffer(size_t, size_t* actual) override { return StringSink::GetBuffer(1, actual); }
};
class CompressionTest : public CowTest, public testing::WithParamInterface<const char*> {};
TEST_P(CompressionTest, HorribleSink) {
CowOptions options;
options.compression = GetParam();
options.cluster_ops = 0;
CowWriter writer(options);
ASSERT_TRUE(writer.Initialize(cow_->fd));
std::string data = "This is some data, believe it";
data.resize(options.block_size, '\0');
ASSERT_TRUE(writer.AddRawBlocks(50, data.data(), data.size()));
ASSERT_TRUE(writer.Finalize());
ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
CowReader reader;
ASSERT_TRUE(reader.Parse(cow_->fd));
auto iter = reader.GetOpIter();
ASSERT_NE(iter, nullptr);
ASSERT_FALSE(iter->Done());
HorribleStringSink sink;
auto op = &iter->Get();
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_EQ(sink.stream(), data);
}
INSTANTIATE_TEST_SUITE_P(CowApi, CompressionTest, testing::Values("none", "gz", "brotli"));
TEST_F(CowTest, GetSize) {
CowOptions options;
options.cluster_ops = 0;
@ -641,7 +650,7 @@ TEST_F(CowTest, AppendLabelSmall) {
ASSERT_TRUE(reader.GetLastLabel(&label));
ASSERT_EQ(label, 3);
StringSink sink;
std::string sink(data.size(), '\0');
auto iter = reader.GetOpIter();
ASSERT_NE(iter, nullptr);
@ -649,11 +658,12 @@ TEST_F(CowTest, AppendLabelSmall) {
ASSERT_FALSE(iter->Done());
auto op = &iter->Get();
ASSERT_EQ(op->type, kCowReplaceOp);
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_EQ(sink.stream(), data);
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data);
iter->Next();
sink.Reset();
sink = {};
sink.resize(data2.size(), '\0');
ASSERT_FALSE(iter->Done());
op = &iter->Get();
@ -665,8 +675,8 @@ TEST_F(CowTest, AppendLabelSmall) {
ASSERT_FALSE(iter->Done());
op = &iter->Get();
ASSERT_EQ(op->type, kCowReplaceOp);
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_EQ(sink.stream(), data2);
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data2);
iter->Next();
ASSERT_TRUE(iter->Done());
@ -705,8 +715,6 @@ TEST_F(CowTest, AppendLabelMissing) {
CowReader reader;
ASSERT_TRUE(reader.Parse(cow_->fd));
StringSink sink;
auto iter = reader.GetOpIter();
ASSERT_NE(iter, nullptr);
@ -765,8 +773,6 @@ TEST_F(CowTest, AppendExtendedCorrupted) {
CowReader reader;
ASSERT_TRUE(reader.Parse(cow_->fd));
StringSink sink;
auto iter = reader.GetOpIter();
ASSERT_NE(iter, nullptr);
@ -816,7 +822,7 @@ TEST_F(CowTest, AppendbyLabel) {
CowReader reader;
ASSERT_TRUE(reader.Parse(cow_->fd));
StringSink sink;
std::string sink(options.block_size, '\0');
auto iter = reader.GetOpIter();
ASSERT_NE(iter, nullptr);
@ -824,20 +830,20 @@ TEST_F(CowTest, AppendbyLabel) {
ASSERT_FALSE(iter->Done());
auto op = &iter->Get();
ASSERT_EQ(op->type, kCowReplaceOp);
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_EQ(sink.stream(), data.substr(0, options.block_size));
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data.substr(0, options.block_size));
iter->Next();
sink.Reset();
sink = {};
sink.resize(options.block_size, '\0');
ASSERT_FALSE(iter->Done());
op = &iter->Get();
ASSERT_EQ(op->type, kCowReplaceOp);
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_EQ(sink.stream(), data.substr(options.block_size, 2 * options.block_size));
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data.substr(options.block_size, 2 * options.block_size));
iter->Next();
sink.Reset();
ASSERT_FALSE(iter->Done());
op = &iter->Get();
@ -897,7 +903,7 @@ TEST_F(CowTest, ClusterTest) {
CowReader reader;
ASSERT_TRUE(reader.Parse(cow_->fd));
StringSink sink;
std::string sink(data.size(), '\0');
auto iter = reader.GetOpIter();
ASSERT_NE(iter, nullptr);
@ -905,11 +911,10 @@ TEST_F(CowTest, ClusterTest) {
ASSERT_FALSE(iter->Done());
auto op = &iter->Get();
ASSERT_EQ(op->type, kCowReplaceOp);
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_EQ(sink.stream(), data.substr(0, options.block_size));
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data.substr(0, options.block_size));
iter->Next();
sink.Reset();
ASSERT_FALSE(iter->Done());
op = &iter->Get();
@ -997,7 +1002,7 @@ TEST_F(CowTest, ClusterAppendTest) {
ASSERT_TRUE(reader.GetLastLabel(&label));
ASSERT_EQ(label, 50);
StringSink sink;
std::string sink(data2.size(), '\0');
auto iter = reader.GetOpIter();
ASSERT_NE(iter, nullptr);
@ -1012,8 +1017,8 @@ TEST_F(CowTest, ClusterAppendTest) {
ASSERT_FALSE(iter->Done());
op = &iter->Get();
ASSERT_EQ(op->type, kCowReplaceOp);
ASSERT_TRUE(reader.ReadData(*op, &sink));
ASSERT_EQ(sink.stream(), data2);
ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size()));
ASSERT_EQ(sink, data2);
iter->Next();
@ -1066,13 +1071,13 @@ AssertionResult CompareDataBlock(CowReader* reader, const CowOperation& op,
std::string cmp = data;
cmp.resize(header.block_size, '\0');
StringSink sink;
if (!reader->ReadData(op, &sink)) {
std::string sink(cmp.size(), '\0');
if (!reader->ReadData(op, sink.data(), sink.size())) {
return AssertionFailure() << "Failed to read data block";
}
if (cmp != sink.stream()) {
if (cmp != sink) {
return AssertionFailure() << "Data blocks did not match, expected " << cmp << ", got "
<< sink.stream();
<< sink;
}
return AssertionSuccess();

View file

@ -32,6 +32,21 @@
namespace android {
namespace snapshot {
std::optional<CowCompressionAlgorithm> CompressionAlgorithmFromString(std::string_view name) {
if (name == "gz") {
return {kCowCompressGz};
} else if (name == "brotli") {
return {kCowCompressBrotli};
} else if (name == "lz4") {
return {kCowCompressLz4};
} else if (name == "none" || name.empty()) {
return {kCowCompressNone};
} else {
return {};
}
}
std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) {
return Compress(compression_, data, length);
}

View file

@ -16,6 +16,7 @@
#include "cow_decompress.h"
#include <array>
#include <utility>
#include <android-base/logging.h>
@ -26,9 +27,50 @@
namespace android {
namespace snapshot {
ssize_t IByteStream::ReadFully(void* buffer, size_t buffer_size) {
size_t stream_remaining = Size();
char* buffer_start = reinterpret_cast<char*>(buffer);
char* buffer_pos = buffer_start;
size_t buffer_remaining = buffer_size;
while (stream_remaining) {
const size_t to_read = std::min(buffer_remaining, stream_remaining);
const ssize_t actual_read = Read(buffer_pos, to_read);
if (actual_read < 0) {
return -1;
}
if (!actual_read) {
LOG(ERROR) << "Stream ended prematurely";
return -1;
}
CHECK_LE(actual_read, to_read);
stream_remaining -= actual_read;
buffer_pos += actual_read;
buffer_remaining -= actual_read;
}
return buffer_pos - buffer_start;
}
std::unique_ptr<IDecompressor> IDecompressor::FromString(std::string_view compressor) {
if (compressor == "lz4") {
return IDecompressor::Lz4();
} else if (compressor == "brotli") {
return IDecompressor::Brotli();
} else if (compressor == "gz") {
return IDecompressor::Gz();
} else {
return nullptr;
}
}
class NoDecompressor final : public IDecompressor {
public:
bool Decompress(size_t) override;
ssize_t Decompress(void*, size_t, size_t, size_t) override {
LOG(ERROR) << "Not supported";
return -1;
}
};
bool NoDecompressor::Decompress(size_t) {
@ -45,8 +87,8 @@ bool NoDecompressor::Decompress(size_t) {
uint8_t* buffer_pos = buffer;
size_t bytes_to_read = std::min(buffer_size, stream_remaining);
while (bytes_to_read) {
size_t read;
if (!stream_->Read(buffer_pos, bytes_to_read, &read)) {
ssize_t read = stream_->Read(buffer_pos, bytes_to_read);
if (read < 0) {
return false;
}
if (!read) {
@ -73,10 +115,13 @@ std::unique_ptr<IDecompressor> IDecompressor::Uncompressed() {
class StreamDecompressor : public IDecompressor {
public:
bool Decompress(size_t output_bytes) override;
ssize_t Decompress(void* buffer, size_t buffer_size, size_t decompressed_size,
size_t ignore_bytes) override;
virtual bool Init() = 0;
virtual bool DecompressInput(const uint8_t* data, size_t length) = 0;
virtual bool Done() = 0;
virtual bool PartialDecompress(const uint8_t* data, size_t length) = 0;
bool OutputFull() const { return !ignore_bytes_ && !output_buffer_remaining_; }
protected:
bool GetFreshBuffer();
@ -85,6 +130,8 @@ class StreamDecompressor : public IDecompressor {
size_t stream_remaining_;
uint8_t* output_buffer_ = nullptr;
size_t output_buffer_remaining_ = 0;
size_t ignore_bytes_ = 0;
bool decompressor_ended_ = false;
};
static constexpr size_t kChunkSize = 4096;
@ -99,8 +146,9 @@ bool StreamDecompressor::Decompress(size_t output_bytes) {
uint8_t chunk[kChunkSize];
while (stream_remaining_) {
size_t read = std::min(stream_remaining_, sizeof(chunk));
if (!stream_->Read(chunk, read, &read)) {
size_t max_read = std::min(stream_remaining_, sizeof(chunk));
ssize_t read = stream_->Read(chunk, max_read);
if (read < 0) {
return false;
}
if (!read) {
@ -113,18 +161,65 @@ bool StreamDecompressor::Decompress(size_t output_bytes) {
stream_remaining_ -= read;
if (stream_remaining_ && Done()) {
if (stream_remaining_ && decompressor_ended_) {
LOG(ERROR) << "Decompressor terminated early";
return false;
}
}
if (!Done()) {
if (!decompressor_ended_) {
LOG(ERROR) << "Decompressor expected more bytes";
return false;
}
return true;
}
ssize_t StreamDecompressor::Decompress(void* buffer, size_t buffer_size, size_t,
size_t ignore_bytes) {
if (!Init()) {
return false;
}
stream_remaining_ = stream_->Size();
output_buffer_ = reinterpret_cast<uint8_t*>(buffer);
output_buffer_remaining_ = buffer_size;
ignore_bytes_ = ignore_bytes;
uint8_t chunk[kChunkSize];
while (stream_remaining_ && output_buffer_remaining_ && !decompressor_ended_) {
size_t max_read = std::min(stream_remaining_, sizeof(chunk));
ssize_t read = stream_->Read(chunk, max_read);
if (read < 0) {
return -1;
}
if (!read) {
LOG(ERROR) << "Stream ended prematurely";
return -1;
}
if (!PartialDecompress(chunk, read)) {
return -1;
}
stream_remaining_ -= read;
}
if (stream_remaining_) {
if (decompressor_ended_ && !OutputFull()) {
// If there's more input in the stream, but we haven't finished
// consuming ignored bytes or available output space yet, then
// something weird happened. Report it and fail.
LOG(ERROR) << "Decompressor terminated early";
return -1;
}
} else {
if (!decompressor_ended_ && !OutputFull()) {
// The stream ended, but the decoder doesn't think so, and there are
// more bytes in the output buffer.
LOG(ERROR) << "Decompressor expected more bytes";
return -1;
}
}
return buffer_size - output_buffer_remaining_;
}
bool StreamDecompressor::GetFreshBuffer() {
size_t request_size = std::min(output_bytes_, kChunkSize);
output_buffer_ =
@ -142,11 +237,10 @@ class GzDecompressor final : public StreamDecompressor {
bool Init() override;
bool DecompressInput(const uint8_t* data, size_t length) override;
bool Done() override { return ended_; }
bool PartialDecompress(const uint8_t* data, size_t length) override;
private:
z_stream z_ = {};
bool ended_ = false;
};
bool GzDecompressor::Init() {
@ -198,7 +292,61 @@ bool GzDecompressor::DecompressInput(const uint8_t* data, size_t length) {
LOG(ERROR) << "Gz stream ended prematurely";
return false;
}
ended_ = true;
decompressor_ended_ = true;
return true;
}
}
return true;
}
bool GzDecompressor::PartialDecompress(const uint8_t* data, size_t length) {
z_.next_in = reinterpret_cast<Bytef*>(const_cast<uint8_t*>(data));
z_.avail_in = length;
// If we're asked to ignore starting bytes, we sink those into the output
// repeatedly until there is nothing left to ignore.
while (ignore_bytes_ && z_.avail_in) {
std::array<Bytef, kChunkSize> ignore_buffer;
size_t max_ignore = std::min(ignore_bytes_, ignore_buffer.size());
z_.next_out = ignore_buffer.data();
z_.avail_out = max_ignore;
int rv = inflate(&z_, Z_NO_FLUSH);
if (rv != Z_OK && rv != Z_STREAM_END) {
LOG(ERROR) << "inflate returned error code " << rv;
return false;
}
size_t returned = max_ignore - z_.avail_out;
CHECK_LE(returned, ignore_bytes_);
ignore_bytes_ -= returned;
if (rv == Z_STREAM_END) {
decompressor_ended_ = true;
return true;
}
}
z_.next_out = reinterpret_cast<Bytef*>(output_buffer_);
z_.avail_out = output_buffer_remaining_;
while (z_.avail_in && z_.avail_out) {
// Decompress.
int rv = inflate(&z_, Z_NO_FLUSH);
if (rv != Z_OK && rv != Z_STREAM_END) {
LOG(ERROR) << "inflate returned error code " << rv;
return false;
}
size_t returned = output_buffer_remaining_ - z_.avail_out;
CHECK_LE(returned, output_buffer_remaining_);
output_buffer_ += returned;
output_buffer_remaining_ -= returned;
if (rv == Z_STREAM_END) {
decompressor_ended_ = true;
return true;
}
}
@ -215,7 +363,7 @@ class BrotliDecompressor final : public StreamDecompressor {
bool Init() override;
bool DecompressInput(const uint8_t* data, size_t length) override;
bool Done() override { return BrotliDecoderIsFinished(decoder_); }
bool PartialDecompress(const uint8_t* data, size_t length) override;
private:
BrotliDecoderState* decoder_ = nullptr;
@ -257,6 +405,44 @@ bool BrotliDecompressor::DecompressInput(const uint8_t* data, size_t length) {
return true;
}
bool BrotliDecompressor::PartialDecompress(const uint8_t* data, size_t length) {
size_t available_in = length;
const uint8_t* next_in = data;
while (available_in && ignore_bytes_ && !BrotliDecoderIsFinished(decoder_)) {
std::array<uint8_t, kChunkSize> ignore_buffer;
size_t max_ignore = std::min(ignore_bytes_, ignore_buffer.size());
size_t ignore_size = max_ignore;
uint8_t* ignore_buffer_ptr = ignore_buffer.data();
auto r = BrotliDecoderDecompressStream(decoder_, &available_in, &next_in, &ignore_size,
&ignore_buffer_ptr, nullptr);
if (r == BROTLI_DECODER_RESULT_ERROR) {
LOG(ERROR) << "brotli decode failed";
return false;
} else if (r == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && available_in) {
LOG(ERROR) << "brotli unexpected needs more input";
return false;
}
ignore_bytes_ -= max_ignore - ignore_size;
}
while (available_in && !BrotliDecoderIsFinished(decoder_)) {
auto r = BrotliDecoderDecompressStream(decoder_, &available_in, &next_in,
&output_buffer_remaining_, &output_buffer_, nullptr);
if (r == BROTLI_DECODER_RESULT_ERROR) {
LOG(ERROR) << "brotli decode failed";
return false;
} else if (r == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && available_in) {
LOG(ERROR) << "brotli unexpected needs more input";
return false;
}
}
decompressor_ended_ = BrotliDecoderIsFinished(decoder_);
return true;
}
std::unique_ptr<IDecompressor> IDecompressor::Brotli() {
return std::unique_ptr<IDecompressor>(new BrotliDecompressor());
}
@ -275,8 +461,7 @@ class Lz4Decompressor final : public IDecompressor {
}
// If input size is same as output size, then input is uncompressed.
if (stream_->Size() == output_size) {
size_t bytes_read = 0;
stream_->Read(output_buffer, output_size, &bytes_read);
ssize_t bytes_read = stream_->ReadFully(output_buffer, output_size);
if (bytes_read != output_size) {
LOG(ERROR) << "Failed to read all input at once. Expected: " << output_size
<< " actual: " << bytes_read;
@ -287,8 +472,7 @@ class Lz4Decompressor final : public IDecompressor {
}
std::string input_buffer;
input_buffer.resize(stream_->Size());
size_t bytes_read = 0;
stream_->Read(input_buffer.data(), input_buffer.size(), &bytes_read);
ssize_t bytes_read = stream_->ReadFully(input_buffer.data(), input_buffer.size());
if (bytes_read != input_buffer.size()) {
LOG(ERROR) << "Failed to read all input at once. Expected: " << input_buffer.size()
<< " actual: " << bytes_read;
@ -305,6 +489,61 @@ class Lz4Decompressor final : public IDecompressor {
sink_->ReturnData(output_buffer, output_size);
return true;
}
ssize_t Decompress(void* buffer, size_t buffer_size, size_t decompressed_size,
size_t ignore_bytes) override {
std::string input_buffer(stream_->Size(), '\0');
ssize_t streamed_in = stream_->ReadFully(input_buffer.data(), input_buffer.size());
if (streamed_in < 0) {
return -1;
}
CHECK_EQ(streamed_in, stream_->Size());
char* decode_buffer = reinterpret_cast<char*>(buffer);
size_t decode_buffer_size = buffer_size;
// It's unclear if LZ4 can exactly satisfy a partial decode request, so
// if we get one, create a temporary buffer.
std::string temp;
if (buffer_size < decompressed_size) {
temp.resize(decompressed_size, '\0');
decode_buffer = temp.data();
decode_buffer_size = temp.size();
}
const int bytes_decompressed = LZ4_decompress_safe(input_buffer.data(), decode_buffer,
input_buffer.size(), decode_buffer_size);
if (bytes_decompressed < 0) {
LOG(ERROR) << "Failed to decompress LZ4 block, code: " << bytes_decompressed;
return -1;
}
if (bytes_decompressed != decompressed_size) {
LOG(ERROR) << "Failed to decompress LZ4 block, expected output size: "
<< bytes_decompressed << ", actual: " << bytes_decompressed;
return -1;
}
CHECK_LE(bytes_decompressed, decode_buffer_size);
if (ignore_bytes > bytes_decompressed) {
LOG(ERROR) << "Ignoring more bytes than exist in stream (ignoring " << ignore_bytes
<< ", got " << bytes_decompressed << ")";
return -1;
}
if (temp.empty()) {
// LZ4's API has no way to sink out the first N bytes of decoding,
// so we read them all in and memmove() to drop the partial read.
if (ignore_bytes) {
memmove(decode_buffer, decode_buffer + ignore_bytes,
bytes_decompressed - ignore_bytes);
}
return bytes_decompressed - ignore_bytes;
}
size_t max_copy = std::min(bytes_decompressed - ignore_bytes, buffer_size);
memcpy(buffer, temp.data() + ignore_bytes, max_copy);
return max_copy;
}
};
std::unique_ptr<IDecompressor> IDecompressor::Lz4() {

View file

@ -26,11 +26,16 @@ class IByteStream {
virtual ~IByteStream() {}
// Read up to |length| bytes, storing the number of bytes read in the out-
// parameter. If the end of the stream is reached, 0 is returned.
virtual bool Read(void* buffer, size_t length, size_t* read) = 0;
// parameter. If the end of the stream is reached, 0 is returned. On error,
// -1 is returned. errno is NOT set.
virtual ssize_t Read(void* buffer, size_t length) = 0;
// Size of the stream.
virtual size_t Size() const = 0;
// Helper for Read(). Read the entire stream into |buffer|, up to |length|
// bytes.
ssize_t ReadFully(void* buffer, size_t length);
};
class IDecompressor {
@ -43,9 +48,21 @@ class IDecompressor {
static std::unique_ptr<IDecompressor> Brotli();
static std::unique_ptr<IDecompressor> Lz4();
static std::unique_ptr<IDecompressor> FromString(std::string_view compressor);
// |output_bytes| is the expected total number of bytes to sink.
virtual bool Decompress(size_t output_bytes) = 0;
// Decompress at most |buffer_size| bytes, ignoring the first |ignore_bytes|
// of the decoded stream. |buffer_size| must be at least one byte.
// |decompressed_size| is the expected total size if the entire stream were
// decompressed.
//
// Returns the number of bytes written to |buffer|, or -1 on error. errno
// is NOT set.
virtual ssize_t Decompress(void* buffer, size_t buffer_size, size_t decompressed_size,
size_t ignore_bytes = 0) = 0;
void set_stream(IByteStream* stream) { stream_ = stream; }
void set_sink(IByteSink* sink) { sink_ = sink; }

View file

@ -747,18 +747,18 @@ class CowDataStream final : public IByteStream {
remaining_ = data_length_;
}
bool Read(void* buffer, size_t length, size_t* read) override {
ssize_t Read(void* buffer, size_t length) override {
size_t to_read = std::min(length, remaining_);
if (!to_read) {
*read = 0;
return true;
return 0;
}
if (!reader_->GetRawBytes(offset_, buffer, to_read, read)) {
return false;
size_t read;
if (!reader_->GetRawBytes(offset_, buffer, to_read, &read)) {
return -1;
}
offset_ += *read;
remaining_ -= *read;
return true;
offset_ += read;
remaining_ -= read;
return read;
}
size_t Size() const override { return data_length_; }
@ -802,5 +802,44 @@ bool CowReader::ReadData(const CowOperation& op, IByteSink* sink) {
return decompressor->Decompress(header_.block_size);
}
ssize_t CowReader::ReadData(const CowOperation& op, void* buffer, size_t buffer_size,
size_t ignore_bytes) {
std::unique_ptr<IDecompressor> decompressor;
switch (op.compression) {
case kCowCompressNone:
break;
case kCowCompressGz:
decompressor = IDecompressor::Gz();
break;
case kCowCompressBrotli:
decompressor = IDecompressor::Brotli();
break;
case kCowCompressLz4:
if (header_.block_size != op.data_length) {
decompressor = IDecompressor::Lz4();
}
break;
default:
LOG(ERROR) << "Unknown compression type: " << op.compression;
return -1;
}
uint64_t offset;
if (op.type == kCowXorOp) {
offset = data_loc_->at(op.new_block);
} else {
offset = op.source;
}
if (!decompressor) {
CowDataStream stream(this, offset + ignore_bytes, op.data_length - ignore_bytes);
return stream.ReadFully(buffer, buffer_size);
}
CowDataStream stream(this, offset, op.data_length);
decompressor->set_stream(&stream);
return decompressor->Decompress(buffer, buffer_size, header_.block_size, ignore_bytes);
}
} // namespace snapshot
} // namespace android

View file

@ -37,6 +37,14 @@
#include <sys/ioctl.h>
#include <unistd.h>
// The info messages here are spammy, but as useful for update_engine. Disable
// them when running on the host.
#ifdef __ANDROID__
#define LOG_INFO LOG(INFO)
#else
#define LOG_INFO LOG(VERBOSE)
#endif
namespace android {
namespace snapshot {
@ -194,18 +202,13 @@ void CowWriter::SetupHeaders() {
}
bool CowWriter::ParseOptions() {
if (options_.compression == "gz") {
compression_ = kCowCompressGz;
} else if (options_.compression == "brotli") {
compression_ = kCowCompressBrotli;
} else if (options_.compression == "lz4") {
compression_ = kCowCompressLz4;
} else if (options_.compression == "none") {
compression_ = kCowCompressNone;
} else if (!options_.compression.empty()) {
auto algorithm = CompressionAlgorithmFromString(options_.compression);
if (!algorithm) {
LOG(ERROR) << "unrecognized compression: " << options_.compression;
return false;
}
compression_ = *algorithm;
if (options_.cluster_ops == 1) {
LOG(ERROR) << "Clusters must contain at least two operations to function.";
return false;
@ -239,10 +242,10 @@ bool CowWriter::SetFd(android::base::borrowed_fd fd) {
return false;
}
cow_image_size_ = size_in_bytes;
LOG(INFO) << "COW image " << file_path << " has size " << size_in_bytes;
LOG_INFO << "COW image " << file_path << " has size " << size_in_bytes;
} else {
LOG(INFO) << "COW image " << file_path
<< " is not a block device, assuming unlimited space.";
LOG_INFO << "COW image " << file_path
<< " is not a block device, assuming unlimited space.";
}
}
return true;
@ -271,12 +274,12 @@ void CowWriter::InitBatchWrites() {
}
std::string batch_write = batch_write_ ? "enabled" : "disabled";
LOG(INFO) << "Batch writes: " << batch_write;
LOG_INFO << "Batch writes: " << batch_write;
}
void CowWriter::InitWorkers() {
if (num_compress_threads_ <= 1) {
LOG(INFO) << "Not creating new threads for compression.";
LOG_INFO << "Not creating new threads for compression.";
return;
}
for (int i = 0; i < num_compress_threads_; i++) {
@ -285,7 +288,7 @@ void CowWriter::InitWorkers() {
compress_threads_.push_back(std::move(wt));
}
LOG(INFO) << num_compress_threads_ << " thread used for compression";
LOG_INFO << num_compress_threads_ << " thread used for compression";
}
bool CowWriter::Initialize(unique_fd&& fd) {

View file

@ -63,24 +63,6 @@ struct Options {
bool include_merged;
};
// Sink that always appends to the end of a string.
class StringSink : public IByteSink {
public:
void* GetBuffer(size_t requested, size_t* actual) override {
size_t old_size = stream_.size();
stream_.resize(old_size + requested, '\0');
*actual = requested;
return stream_.data() + old_size;
}
bool ReturnData(void*, size_t) override { return true; }
void Reset() { stream_.clear(); }
std::string& stream() { return stream_; }
private:
std::string stream_;
};
static void ShowBad(CowReader& reader, const struct CowOperation& op) {
size_t count;
auto buffer = std::make_unique<uint8_t[]>(op.data_length);
@ -153,7 +135,9 @@ static bool Inspect(const std::string& path, Options opt) {
} else if (opt.iter_type == Merge) {
iter = reader.GetMergeOpIter(opt.include_merged);
}
StringSink sink;
std::string buffer(header.block_size, '\0');
bool success = true;
uint64_t xor_ops = 0, copy_ops = 0, replace_ops = 0, zero_ops = 0;
while (!iter->Done()) {
@ -162,12 +146,11 @@ static bool Inspect(const std::string& path, Options opt) {
if (!opt.silent && opt.show_ops) std::cout << op << "\n";
if (opt.decompress && op.type == kCowReplaceOp && op.compression != kCowCompressNone) {
if (!reader.ReadData(op, &sink)) {
if (reader.ReadData(op, buffer.data(), buffer.size()) < 0) {
std::cerr << "Failed to decompress for :" << op << "\n";
success = false;
if (opt.show_bad) ShowBad(reader, op);
}
sink.Reset();
}
if (op.type == kCowSequenceOp && opt.show_seq) {