Merge "Support batch writes for non-compressed ops" into main

This commit is contained in:
Treehugger Robot 2023-12-12 01:43:24 +00:00 committed by Gerrit Code Review
commit 314818029c
4 changed files with 104 additions and 57 deletions

View file

@ -14,7 +14,6 @@
// limitations under the License.
//
#include <inttypes.h>
#include <libsnapshot/cow_format.h>
#include <sys/types.h>
#include <unistd.h>

View file

@ -97,7 +97,7 @@ TEST_F(CowTestV3, MaxOp) {
options.op_count_max = 20;
auto writer = CreateCowWriter(3, options, GetCowFd());
ASSERT_FALSE(writer->AddZeroBlocks(1, 21));
ASSERT_FALSE(writer->AddZeroBlocks(1, 1));
ASSERT_TRUE(writer->AddZeroBlocks(1, 20));
std::string data = "This is some data, believe it";
data.resize(options.block_size, '\0');
@ -184,7 +184,7 @@ TEST_F(CowTestV3, ConsecutiveReplaceOp) {
std::string data;
data.resize(options.block_size * 5);
for (int i = 0; i < data.size(); i++) {
data[i] = char(rand() % 256);
data[i] = static_cast<char>('A' + i / options.block_size);
}
ASSERT_TRUE(writer->AddRawBlocks(5, data.data(), data.size()));
@ -205,19 +205,20 @@ TEST_F(CowTestV3, ConsecutiveReplaceOp) {
ASSERT_FALSE(iter->AtEnd());
size_t i = 0;
std::string sink(data.size(), '\0');
while (!iter->AtEnd()) {
auto op = iter->Get();
std::string sink(options.block_size, '\0');
ASSERT_EQ(op->type(), kCowReplaceOp);
ASSERT_EQ(op->data_length, options.block_size);
ASSERT_EQ(op->new_block, 5 + i);
ASSERT_TRUE(
ReadData(reader, op, sink.data() + (i * options.block_size), options.block_size));
ASSERT_TRUE(ReadData(reader, op, sink.data(), options.block_size));
ASSERT_EQ(std::string_view(sink),
std::string_view(data).substr(i * options.block_size, options.block_size))
<< " readback data for " << i << "th block does not match";
iter->Next();
i++;
}
ASSERT_EQ(sink, data);
ASSERT_EQ(i, 5);
}
@ -372,41 +373,33 @@ TEST_F(CowTestV3, AllOpsWithCompression) {
ASSERT_NE(iter, nullptr);
ASSERT_FALSE(iter->AtEnd());
size_t i = 0;
while (i < 5) {
for (size_t i = 0; i < 5; i++) {
auto op = iter->Get();
ASSERT_EQ(op->type(), kCowZeroOp);
ASSERT_EQ(op->new_block, 10 + i);
iter->Next();
i++;
}
i = 0;
while (i < 5) {
for (size_t i = 0; i < 5; i++) {
auto op = iter->Get();
ASSERT_EQ(op->type(), kCowCopyOp);
ASSERT_EQ(op->new_block, 15 + i);
ASSERT_EQ(op->source(), 3 + i);
iter->Next();
i++;
}
i = 0;
std::string sink(data.size(), '\0');
while (i < 5) {
for (size_t i = 0; i < 5; i++) {
auto op = iter->Get();
ASSERT_EQ(op->type(), kCowReplaceOp);
ASSERT_EQ(op->new_block, 18 + i);
ASSERT_TRUE(
ReadData(reader, op, sink.data() + (i * options.block_size), options.block_size));
ASSERT_EQ(reader.ReadData(op, sink.data() + (i * options.block_size), options.block_size),
options.block_size);
iter->Next();
i++;
}
ASSERT_EQ(sink, data);
i = 0;
std::fill(sink.begin(), sink.end(), '\0');
while (i < 5) {
for (size_t i = 0; i < 5; i++) {
auto op = iter->Get();
ASSERT_EQ(op->type(), kCowXorOp);
ASSERT_EQ(op->new_block, 50 + i);
@ -414,7 +407,6 @@ TEST_F(CowTestV3, AllOpsWithCompression) {
ASSERT_TRUE(
ReadData(reader, op, sink.data() + (i * options.block_size), options.block_size));
iter->Next();
i++;
}
ASSERT_EQ(sink, data);
}
@ -671,5 +663,25 @@ TEST_F(CowTestV3, CowSizeEstimate) {
ASSERT_LE(writer.GetCowSize(), cow_size);
}
TEST_F(CowTestV3, CopyOpMany) {
CowOptions options;
options.op_count_max = 100;
CowWriterV3 writer(options, GetCowFd());
writer.Initialize();
ASSERT_TRUE(writer.AddCopy(100, 50, 50));
ASSERT_TRUE(writer.AddCopy(150, 100, 50));
ASSERT_TRUE(writer.Finalize());
CowReader reader;
ASSERT_TRUE(reader.Parse(GetCowFd()));
auto it = reader.GetOpIter();
for (size_t i = 0; i < 100; i++) {
ASSERT_FALSE(it->AtEnd()) << " op iterator ended at " << i;
const auto op = *it->Get();
ASSERT_EQ(op.type(), kCowCopyOp);
ASSERT_EQ(op.new_block, 100 + i);
it->Next();
}
}
} // namespace snapshot
} // namespace android

View file

@ -113,7 +113,13 @@ bool CowWriterV3::ParseOptions() {
}
compression_.algorithm = *algorithm;
compressor_ = ICompressor::Create(compression_, header_.block_size);
if (compression_.algorithm != kCowCompressNone) {
compressor_ = ICompressor::Create(compression_, header_.block_size);
if (compressor_ == nullptr) {
LOG(ERROR) << "Failed to create compressor for " << compression_.algorithm;
return false;
}
}
return true;
}
@ -207,14 +213,15 @@ bool CowWriterV3::OpenForAppend(uint64_t label) {
}
bool CowWriterV3::EmitCopy(uint64_t new_block, uint64_t old_block, uint64_t num_blocks) {
std::vector<CowOperationV3> ops(num_blocks);
for (size_t i = 0; i < num_blocks; i++) {
CowOperationV3 op{};
CowOperationV3& op = ops[i];
op.set_type(kCowCopyOp);
op.new_block = new_block + i;
op.set_source(old_block + i);
if (!WriteOperation(op)) {
return false;
}
}
if (!WriteOperation({ops.data(), ops.size()}, {})) {
return false;
}
return true;
@ -231,12 +238,37 @@ bool CowWriterV3::EmitXorBlocks(uint32_t new_block_start, const void* data, size
bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
uint64_t old_block, uint16_t offset, CowOperationType type) {
if (compression_.algorithm != kCowCompressNone && compressor_ == nullptr) {
LOG(ERROR) << "Compression algorithm is " << compression_.algorithm
<< " but compressor is uninitialized.";
return false;
}
const size_t num_blocks = (size / header_.block_size);
if (compression_.algorithm == kCowCompressNone) {
std::vector<CowOperationV3> ops(num_blocks);
for (size_t i = 0; i < num_blocks; i++) {
CowOperation& op = ops[i];
op.new_block = new_block_start + i;
op.set_type(type);
if (type == kCowXorOp) {
op.set_source((old_block + i) * header_.block_size + offset);
} else {
op.set_source(next_data_pos_ + header_.block_size * i);
}
op.data_length = header_.block_size;
}
return WriteOperation({ops.data(), ops.size()},
{reinterpret_cast<const uint8_t*>(data), size});
}
const auto saved_op_count = header_.op_count;
const auto saved_data_pos = next_data_pos_;
for (size_t i = 0; i < num_blocks; i++) {
const uint8_t* const iter =
reinterpret_cast<const uint8_t*>(data) + (header_.block_size * i);
CowOperation op = {};
CowOperation op{};
op.new_block = new_block_start + i;
op.set_type(type);
@ -245,25 +277,21 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t
} else {
op.set_source(next_data_pos_);
}
std::basic_string<uint8_t> compressed_data;
const void* out_data = iter;
op.data_length = header_.block_size;
if (compression_.algorithm) {
if (!compressor_) {
PLOG(ERROR) << "Compressor not initialized";
return false;
}
compressed_data = compressor_->Compress(out_data, header_.block_size);
if (compressed_data.size() < op.data_length) {
out_data = compressed_data.data();
op.data_length = compressed_data.size();
}
const std::basic_string<uint8_t> compressed_data =
compressor_->Compress(out_data, header_.block_size);
if (compressed_data.size() < op.data_length) {
out_data = compressed_data.data();
op.data_length = compressed_data.size();
}
if (!WriteOperation(op, out_data, op.data_length)) {
PLOG(ERROR) << "AddRawBlocks with compression: write failed. new block: "
<< new_block_start << " compression: " << compression_.algorithm;
header_.op_count = saved_op_count;
next_data_pos_ = saved_data_pos;
return false;
}
}
@ -272,13 +300,14 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t
}
bool CowWriterV3::EmitZeroBlocks(uint64_t new_block_start, uint64_t num_blocks) {
std::vector<CowOperationV3> ops(num_blocks);
for (uint64_t i = 0; i < num_blocks; i++) {
CowOperationV3 op{};
CowOperationV3& op = ops[i];
op.set_type(kCowZeroOp);
op.new_block = new_block_start + i;
if (!WriteOperation(op)) {
return false;
}
}
if (!WriteOperation({ops.data(), ops.size()}, {})) {
return false;
}
return true;
}
@ -324,43 +353,48 @@ bool CowWriterV3::EmitSequenceData(size_t num_ops, const uint32_t* data) {
return true;
}
bool CowWriterV3::WriteOperation(const CowOperationV3& op, const void* data, size_t size) {
bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops,
std::basic_string_view<uint8_t> data) {
if (IsEstimating()) {
header_.op_count++;
header_.op_count += ops.size();
if (header_.op_count > header_.op_count_max) {
// If we increment op_count_max, the offset of data section would
// change. So need to update |next_data_pos_|
next_data_pos_ += (header_.op_count - header_.op_count_max) * sizeof(CowOperationV3);
header_.op_count_max = header_.op_count;
}
next_data_pos_ += op.data_length;
next_data_pos_ += data.size();
return true;
}
if (header_.op_count + 1 > header_.op_count_max) {
LOG(ERROR) << "Maximum number of ops reached: " << header_.op_count_max;
if (header_.op_count + ops.size() > header_.op_count_max) {
LOG(ERROR) << "Current op count " << header_.op_count << ", attempting to write "
<< ops.size() << " ops will exceed the max of " << header_.op_count_max;
return false;
}
const off_t offset = GetOpOffset(header_.op_count, header_);
if (!android::base::WriteFullyAtOffset(fd_, &op, sizeof(op), offset)) {
PLOG(ERROR) << "write failed for " << op << " at " << offset;
if (!android::base::WriteFullyAtOffset(fd_, ops.data(), ops.size() * sizeof(ops[0]), offset)) {
PLOG(ERROR) << "Write failed for " << ops.size() << " ops at " << offset;
return false;
}
if (data && size > 0) {
if (!android::base::WriteFullyAtOffset(fd_, data, size, next_data_pos_)) {
PLOG(ERROR) << "write failed for data of size: " << size
if (!data.empty()) {
if (!android::base::WriteFullyAtOffset(fd_, data.data(), data.size(), next_data_pos_)) {
PLOG(ERROR) << "write failed for data of size: " << data.size()
<< " at offset: " << next_data_pos_;
return false;
}
}
header_.op_count++;
next_data_pos_ += op.data_length;
next_op_pos_ += sizeof(CowOperationV3);
header_.op_count += ops.size();
next_data_pos_ += data.size();
return true;
}
bool CowWriterV3::WriteOperation(const CowOperationV3& op, const void* data, size_t size) {
return WriteOperation({&op, 1}, {reinterpret_cast<const uint8_t*>(data), size});
}
bool CowWriterV3::Finalize() {
CHECK_GE(header_.prefix.header_size, sizeof(CowHeaderV3));
CHECK_LE(header_.prefix.header_size, sizeof(header_));

View file

@ -15,6 +15,7 @@
#pragma once
#include <android-base/logging.h>
#include <string_view>
#include "writer_base.h"
@ -44,6 +45,8 @@ class CowWriterV3 : public CowWriterBase {
bool ParseOptions();
bool OpenForWrite();
bool OpenForAppend(uint64_t label);
bool WriteOperation(std::basic_string_view<CowOperationV3> op,
std::basic_string_view<uint8_t> data);
bool WriteOperation(const CowOperationV3& op, const void* data = nullptr, size_t size = 0);
bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
uint16_t offset, CowOperationType type);
@ -59,7 +62,6 @@ class CowWriterV3 : public CowWriterBase {
// Resume points contain a laebl + cow_op_index.
std::shared_ptr<std::vector<ResumePoint>> resume_points_;
uint64_t next_op_pos_ = 0;
uint64_t next_data_pos_ = 0;
std::vector<std::basic_string<uint8_t>> compressed_buf_;