Merge "libsnapshot: Remove the IByteSink API."

This commit is contained in:
David Anderson 2023-05-02 18:05:41 +00:00 committed by Gerrit Code Review
commit d71f1c7225
5 changed files with 5 additions and 287 deletions

View file

@ -29,35 +29,6 @@ namespace snapshot {
class ICowOpIter;
// A ByteSink object handles requests for a buffer of a specific size. It
// always owns the underlying buffer. It's designed to minimize potential
// copying as we parse or decompress the COW.
class IByteSink {
public:
virtual ~IByteSink() {}
// Called when the reader has data. The size of the request is given. The
// sink must return a valid pointer (or null on failure), and return the
// maximum number of bytes that can be written to the returned buffer.
//
// The returned buffer is owned by IByteSink, but must remain valid until
// the read operation has completed (or the entire buffer has been
// covered by calls to ReturnData).
//
// After calling GetBuffer(), all previous buffers returned are no longer
// valid.
//
// GetBuffer() is intended to be sequential. A returned size of N indicates
// that the output stream will advance by N bytes, and the ReturnData call
// indicates that those bytes have been fulfilled. Therefore, it is
// possible to have ReturnBuffer do nothing, if the implementation doesn't
// care about incremental writes.
virtual void* GetBuffer(size_t requested, size_t* actual) = 0;
// Called when a section returned by |GetBuffer| has been filled with data.
virtual bool ReturnData(void* buffer, size_t length) = 0;
};
// Interface for reading from a snapuserd COW.
class ICowReader {
public:
@ -83,10 +54,6 @@ class ICowReader {
// Return an iterator for retrieving CowOperation entries in merge order
virtual std::unique_ptr<ICowOpIter> GetMergeOpIter(bool ignore_progress) = 0;
// Get decoded bytes from the data section, handling any decompression.
// All retrieved data is passed to the sink.
virtual bool ReadData(const CowOperation& op, IByteSink* sink) = 0;
// Get decoded bytes from the data section, handling any decompression.
//
// If ignore_bytes is non-zero, it specifies the initial number of bytes
@ -153,7 +120,6 @@ class CowReader final : public ICowReader {
std::unique_ptr<ICowOpIter> GetRevMergeOpIter(bool ignore_progress = false) override;
std::unique_ptr<ICowOpIter> GetMergeOpIter(bool ignore_progress = false) override;
bool ReadData(const CowOperation& op, IByteSink* sink) override;
ssize_t ReadData(const CowOperation& op, void* buffer, size_t buffer_size,
size_t ignore_bytes = 0) override;

View file

@ -64,69 +64,17 @@ std::unique_ptr<IDecompressor> IDecompressor::FromString(std::string_view compre
}
}
class NoDecompressor final : public IDecompressor {
public:
bool Decompress(size_t) override;
ssize_t Decompress(void*, size_t, size_t, size_t) override {
LOG(ERROR) << "Not supported";
return -1;
}
};
bool NoDecompressor::Decompress(size_t) {
size_t stream_remaining = stream_->Size();
while (stream_remaining) {
size_t buffer_size = stream_remaining;
uint8_t* buffer = reinterpret_cast<uint8_t*>(sink_->GetBuffer(buffer_size, &buffer_size));
if (!buffer) {
LOG(ERROR) << "Could not acquire buffer from sink";
return false;
}
// Read until we can fill the buffer.
uint8_t* buffer_pos = buffer;
size_t bytes_to_read = std::min(buffer_size, stream_remaining);
while (bytes_to_read) {
ssize_t read = stream_->Read(buffer_pos, bytes_to_read);
if (read < 0) {
return false;
}
if (!read) {
LOG(ERROR) << "Stream ended prematurely";
return false;
}
if (!sink_->ReturnData(buffer_pos, read)) {
LOG(ERROR) << "Could not return buffer to sink";
return false;
}
buffer_pos += read;
bytes_to_read -= read;
stream_remaining -= read;
}
}
return true;
}
std::unique_ptr<IDecompressor> IDecompressor::Uncompressed() {
return std::unique_ptr<IDecompressor>(new NoDecompressor());
}
// Read chunks of the COW and incrementally stream them to the decoder.
class StreamDecompressor : public IDecompressor {
public:
bool Decompress(size_t output_bytes) override;
ssize_t Decompress(void* buffer, size_t buffer_size, size_t decompressed_size,
size_t ignore_bytes) override;
virtual bool Init() = 0;
virtual bool DecompressInput(const uint8_t* data, size_t length) = 0;
virtual bool PartialDecompress(const uint8_t* data, size_t length) = 0;
bool OutputFull() const { return !ignore_bytes_ && !output_buffer_remaining_; }
protected:
bool GetFreshBuffer();
size_t output_bytes_;
size_t stream_remaining_;
uint8_t* output_buffer_ = nullptr;
size_t output_buffer_remaining_ = 0;
@ -136,43 +84,6 @@ class StreamDecompressor : public IDecompressor {
static constexpr size_t kChunkSize = 4096;
bool StreamDecompressor::Decompress(size_t output_bytes) {
if (!Init()) {
return false;
}
stream_remaining_ = stream_->Size();
output_bytes_ = output_bytes;
uint8_t chunk[kChunkSize];
while (stream_remaining_) {
size_t max_read = std::min(stream_remaining_, sizeof(chunk));
ssize_t read = stream_->Read(chunk, max_read);
if (read < 0) {
return false;
}
if (!read) {
LOG(ERROR) << "Stream ended prematurely";
return false;
}
if (!DecompressInput(chunk, read)) {
return false;
}
stream_remaining_ -= read;
if (stream_remaining_ && decompressor_ended_) {
LOG(ERROR) << "Decompressor terminated early";
return false;
}
}
if (!decompressor_ended_) {
LOG(ERROR) << "Decompressor expected more bytes";
return false;
}
return true;
}
ssize_t StreamDecompressor::Decompress(void* buffer, size_t buffer_size, size_t,
size_t ignore_bytes) {
if (!Init()) {
@ -220,23 +131,11 @@ ssize_t StreamDecompressor::Decompress(void* buffer, size_t buffer_size, size_t,
return buffer_size - output_buffer_remaining_;
}
bool StreamDecompressor::GetFreshBuffer() {
size_t request_size = std::min(output_bytes_, kChunkSize);
output_buffer_ =
reinterpret_cast<uint8_t*>(sink_->GetBuffer(request_size, &output_buffer_remaining_));
if (!output_buffer_) {
LOG(ERROR) << "Could not acquire buffer from sink";
return false;
}
return true;
}
class GzDecompressor final : public StreamDecompressor {
public:
~GzDecompressor();
bool Init() override;
bool DecompressInput(const uint8_t* data, size_t length) override;
bool PartialDecompress(const uint8_t* data, size_t length) override;
private:
@ -255,50 +154,6 @@ GzDecompressor::~GzDecompressor() {
inflateEnd(&z_);
}
bool GzDecompressor::DecompressInput(const uint8_t* data, size_t length) {
z_.next_in = reinterpret_cast<Bytef*>(const_cast<uint8_t*>(data));
z_.avail_in = length;
while (z_.avail_in) {
// If no more output buffer, grab a new buffer.
if (z_.avail_out == 0) {
if (!GetFreshBuffer()) {
return false;
}
z_.next_out = reinterpret_cast<Bytef*>(output_buffer_);
z_.avail_out = output_buffer_remaining_;
}
// Remember the position of the output buffer so we can call ReturnData.
auto avail_out = z_.avail_out;
// Decompress.
int rv = inflate(&z_, Z_NO_FLUSH);
if (rv != Z_OK && rv != Z_STREAM_END) {
LOG(ERROR) << "inflate returned error code " << rv;
return false;
}
size_t returned = avail_out - z_.avail_out;
if (!sink_->ReturnData(output_buffer_, returned)) {
LOG(ERROR) << "Could not return buffer to sink";
return false;
}
output_buffer_ += returned;
output_buffer_remaining_ -= returned;
if (rv == Z_STREAM_END) {
if (z_.avail_in) {
LOG(ERROR) << "Gz stream ended prematurely";
return false;
}
decompressor_ended_ = true;
return true;
}
}
return true;
}
bool GzDecompressor::PartialDecompress(const uint8_t* data, size_t length) {
z_.next_in = reinterpret_cast<Bytef*>(const_cast<uint8_t*>(data));
z_.avail_in = length;
@ -362,7 +217,6 @@ class BrotliDecompressor final : public StreamDecompressor {
~BrotliDecompressor();
bool Init() override;
bool DecompressInput(const uint8_t* data, size_t length) override;
bool PartialDecompress(const uint8_t* data, size_t length) override;
private:
@ -380,31 +234,6 @@ BrotliDecompressor::~BrotliDecompressor() {
}
}
bool BrotliDecompressor::DecompressInput(const uint8_t* data, size_t length) {
size_t available_in = length;
const uint8_t* next_in = data;
bool needs_more_output = false;
while (available_in || needs_more_output) {
if (!output_buffer_remaining_ && !GetFreshBuffer()) {
return false;
}
auto output_buffer = output_buffer_;
auto r = BrotliDecoderDecompressStream(decoder_, &available_in, &next_in,
&output_buffer_remaining_, &output_buffer_, nullptr);
if (r == BROTLI_DECODER_RESULT_ERROR) {
LOG(ERROR) << "brotli decode failed";
return false;
}
if (!sink_->ReturnData(output_buffer, output_buffer_ - output_buffer)) {
return false;
}
needs_more_output = (r == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT);
}
return true;
}
bool BrotliDecompressor::PartialDecompress(const uint8_t* data, size_t length) {
size_t available_in = length;
const uint8_t* next_in = data;
@ -451,45 +280,6 @@ class Lz4Decompressor final : public IDecompressor {
public:
~Lz4Decompressor() override = default;
bool Decompress(const size_t output_size) override {
size_t actual_buffer_size = 0;
auto&& output_buffer = sink_->GetBuffer(output_size, &actual_buffer_size);
if (actual_buffer_size != output_size) {
LOG(ERROR) << "Failed to allocate buffer of size " << output_size << " only got "
<< actual_buffer_size << " bytes";
return false;
}
// If input size is same as output size, then input is uncompressed.
if (stream_->Size() == output_size) {
ssize_t bytes_read = stream_->ReadFully(output_buffer, output_size);
if (bytes_read != output_size) {
LOG(ERROR) << "Failed to read all input at once. Expected: " << output_size
<< " actual: " << bytes_read;
return false;
}
sink_->ReturnData(output_buffer, output_size);
return true;
}
std::string input_buffer;
input_buffer.resize(stream_->Size());
ssize_t bytes_read = stream_->ReadFully(input_buffer.data(), input_buffer.size());
if (bytes_read != input_buffer.size()) {
LOG(ERROR) << "Failed to read all input at once. Expected: " << input_buffer.size()
<< " actual: " << bytes_read;
return false;
}
const int bytes_decompressed =
LZ4_decompress_safe(input_buffer.data(), static_cast<char*>(output_buffer),
input_buffer.size(), output_size);
if (bytes_decompressed != output_size) {
LOG(ERROR) << "Failed to decompress LZ4 block, expected output size: " << output_size
<< ", actual: " << bytes_decompressed;
return false;
}
sink_->ReturnData(output_buffer, output_size);
return true;
}
ssize_t Decompress(void* buffer, size_t buffer_size, size_t decompressed_size,
size_t ignore_bytes) override {
std::string input_buffer(stream_->Size(), '\0');

View file

@ -50,9 +50,6 @@ class IDecompressor {
static std::unique_ptr<IDecompressor> FromString(std::string_view compressor);
// |output_bytes| is the expected total number of bytes to sink.
virtual bool Decompress(size_t output_bytes) = 0;
// Decompress at most |buffer_size| bytes, ignoring the first |ignore_bytes|
// of the decoded stream. |buffer_size| must be at least one byte.
// |decompressed_size| is the expected total size if the entire stream were
@ -64,11 +61,9 @@ class IDecompressor {
size_t ignore_bytes = 0) = 0;
void set_stream(IByteStream* stream) { stream_ = stream; }
void set_sink(IByteSink* sink) { sink_ = sink; }
protected:
IByteStream* stream_ = nullptr;
IByteSink* sink_ = nullptr;
};
} // namespace snapshot

View file

@ -770,38 +770,6 @@ class CowDataStream final : public IByteStream {
size_t remaining_;
};
bool CowReader::ReadData(const CowOperation& op, IByteSink* sink) {
std::unique_ptr<IDecompressor> decompressor;
switch (op.compression) {
case kCowCompressNone:
decompressor = IDecompressor::Uncompressed();
break;
case kCowCompressGz:
decompressor = IDecompressor::Gz();
break;
case kCowCompressBrotli:
decompressor = IDecompressor::Brotli();
break;
case kCowCompressLz4:
decompressor = IDecompressor::Lz4();
break;
default:
LOG(ERROR) << "Unknown compression type: " << op.compression;
return false;
}
uint64_t offset;
if (op.type == kCowXorOp) {
offset = data_loc_->at(op.new_block);
} else {
offset = op.source;
}
CowDataStream stream(this, offset, op.data_length);
decompressor->set_stream(&stream);
decompressor->set_sink(sink);
return decompressor->Decompress(header_.block_size);
}
ssize_t CowReader::ReadData(const CowOperation& op, void* buffer, size_t buffer_size,
size_t ignore_bytes) {
std::unique_ptr<IDecompressor> decompressor;

View file

@ -25,16 +25,15 @@
namespace android {
namespace snapshot {
class BufferSink : public IByteSink {
class BufferSink final {
public:
void Initialize(size_t size);
void* GetBufPtr() { return buffer_.get(); }
void Clear() { memset(GetBufPtr(), 0, buffer_size_); }
void* GetPayloadBuffer(size_t size);
void* GetBuffer(size_t requested, size_t* actual) override;
void* GetBuffer(size_t requested, size_t* actual);
void UpdateBufferOffset(size_t size) { buffer_offset_ += size; }
struct dm_user_header* GetHeaderPtr();
bool ReturnData(void*, size_t) override { return true; }
void ResetBufferOffset() { buffer_offset_ = 0; }
void* GetPayloadBufPtr();
@ -44,12 +43,12 @@ class BufferSink : public IByteSink {
size_t buffer_size_;
};
class XorSink : public IByteSink {
class XorSink final {
public:
void Initialize(BufferSink* sink, size_t size);
void Reset();
void* GetBuffer(size_t requested, size_t* actual) override;
bool ReturnData(void* buffer, size_t len) override;
void* GetBuffer(size_t requested, size_t* actual);
bool ReturnData(void* buffer, size_t len);
private:
BufferSink* bufsink_;