Support batch writes for non-compressed ops

This also improves atomicity of ops. If a single Add*Blocks() call
with 100 blocks failed in the middle, partially written blocks would be
discarded, and op count on disk stays unchanged. Previously wew ould
update the op count on disk with partially written blocks, causing
labels to be inaccurate.

Test: th
Bug: 313962438
Change-Id: If175a705f6ec46c1b25c52d0d9f02f01a540ce55
This commit is contained in:
Kelvin Zhang 2023-12-06 16:37:02 -08:00
parent 73ade16187
commit 84e5e6f751
4 changed files with 104 additions and 57 deletions

View file

@ -14,7 +14,6 @@
// limitations under the License.
//
#include <inttypes.h>
#include <libsnapshot/cow_format.h>
#include <sys/types.h>
#include <unistd.h>

View file

@ -97,7 +97,7 @@ TEST_F(CowTestV3, MaxOp) {
options.op_count_max = 20;
auto writer = CreateCowWriter(3, options, GetCowFd());
ASSERT_FALSE(writer->AddZeroBlocks(1, 21));
ASSERT_FALSE(writer->AddZeroBlocks(1, 1));
ASSERT_TRUE(writer->AddZeroBlocks(1, 20));
std::string data = "This is some data, believe it";
data.resize(options.block_size, '\0');
@ -184,7 +184,7 @@ TEST_F(CowTestV3, ConsecutiveReplaceOp) {
std::string data;
data.resize(options.block_size * 5);
for (int i = 0; i < data.size(); i++) {
data[i] = char(rand() % 256);
data[i] = static_cast<char>('A' + i / options.block_size);
}
ASSERT_TRUE(writer->AddRawBlocks(5, data.data(), data.size()));
@ -205,19 +205,20 @@ TEST_F(CowTestV3, ConsecutiveReplaceOp) {
ASSERT_FALSE(iter->AtEnd());
size_t i = 0;
std::string sink(data.size(), '\0');
while (!iter->AtEnd()) {
auto op = iter->Get();
std::string sink(options.block_size, '\0');
ASSERT_EQ(op->type(), kCowReplaceOp);
ASSERT_EQ(op->data_length, options.block_size);
ASSERT_EQ(op->new_block, 5 + i);
ASSERT_TRUE(
ReadData(reader, op, sink.data() + (i * options.block_size), options.block_size));
ASSERT_TRUE(ReadData(reader, op, sink.data(), options.block_size));
ASSERT_EQ(std::string_view(sink),
std::string_view(data).substr(i * options.block_size, options.block_size))
<< " readback data for " << i << "th block does not match";
iter->Next();
i++;
}
ASSERT_EQ(sink, data);
ASSERT_EQ(i, 5);
}
@ -372,41 +373,33 @@ TEST_F(CowTestV3, AllOpsWithCompression) {
ASSERT_NE(iter, nullptr);
ASSERT_FALSE(iter->AtEnd());
size_t i = 0;
while (i < 5) {
for (size_t i = 0; i < 5; i++) {
auto op = iter->Get();
ASSERT_EQ(op->type(), kCowZeroOp);
ASSERT_EQ(op->new_block, 10 + i);
iter->Next();
i++;
}
i = 0;
while (i < 5) {
for (size_t i = 0; i < 5; i++) {
auto op = iter->Get();
ASSERT_EQ(op->type(), kCowCopyOp);
ASSERT_EQ(op->new_block, 15 + i);
ASSERT_EQ(op->source(), 3 + i);
iter->Next();
i++;
}
i = 0;
std::string sink(data.size(), '\0');
while (i < 5) {
for (size_t i = 0; i < 5; i++) {
auto op = iter->Get();
ASSERT_EQ(op->type(), kCowReplaceOp);
ASSERT_EQ(op->new_block, 18 + i);
ASSERT_TRUE(
ReadData(reader, op, sink.data() + (i * options.block_size), options.block_size));
ASSERT_EQ(reader.ReadData(op, sink.data() + (i * options.block_size), options.block_size),
options.block_size);
iter->Next();
i++;
}
ASSERT_EQ(sink, data);
i = 0;
std::fill(sink.begin(), sink.end(), '\0');
while (i < 5) {
for (size_t i = 0; i < 5; i++) {
auto op = iter->Get();
ASSERT_EQ(op->type(), kCowXorOp);
ASSERT_EQ(op->new_block, 50 + i);
@ -414,7 +407,6 @@ TEST_F(CowTestV3, AllOpsWithCompression) {
ASSERT_TRUE(
ReadData(reader, op, sink.data() + (i * options.block_size), options.block_size));
iter->Next();
i++;
}
ASSERT_EQ(sink, data);
}
@ -671,5 +663,25 @@ TEST_F(CowTestV3, CowSizeEstimate) {
ASSERT_LE(writer.GetCowSize(), cow_size);
}
TEST_F(CowTestV3, CopyOpMany) {
CowOptions options;
options.op_count_max = 100;
CowWriterV3 writer(options, GetCowFd());
writer.Initialize();
ASSERT_TRUE(writer.AddCopy(100, 50, 50));
ASSERT_TRUE(writer.AddCopy(150, 100, 50));
ASSERT_TRUE(writer.Finalize());
CowReader reader;
ASSERT_TRUE(reader.Parse(GetCowFd()));
auto it = reader.GetOpIter();
for (size_t i = 0; i < 100; i++) {
ASSERT_FALSE(it->AtEnd()) << " op iterator ended at " << i;
const auto op = *it->Get();
ASSERT_EQ(op.type(), kCowCopyOp);
ASSERT_EQ(op.new_block, 100 + i);
it->Next();
}
}
} // namespace snapshot
} // namespace android

View file

@ -113,7 +113,13 @@ bool CowWriterV3::ParseOptions() {
}
compression_.algorithm = *algorithm;
compressor_ = ICompressor::Create(compression_, header_.block_size);
if (compression_.algorithm != kCowCompressNone) {
compressor_ = ICompressor::Create(compression_, header_.block_size);
if (compressor_ == nullptr) {
LOG(ERROR) << "Failed to create compressor for " << compression_.algorithm;
return false;
}
}
return true;
}
@ -207,14 +213,15 @@ bool CowWriterV3::OpenForAppend(uint64_t label) {
}
bool CowWriterV3::EmitCopy(uint64_t new_block, uint64_t old_block, uint64_t num_blocks) {
std::vector<CowOperationV3> ops(num_blocks);
for (size_t i = 0; i < num_blocks; i++) {
CowOperationV3 op{};
CowOperationV3& op = ops[i];
op.set_type(kCowCopyOp);
op.new_block = new_block + i;
op.set_source(old_block + i);
if (!WriteOperation(op)) {
return false;
}
}
if (!WriteOperation({ops.data(), ops.size()}, {})) {
return false;
}
return true;
@ -231,12 +238,37 @@ bool CowWriterV3::EmitXorBlocks(uint32_t new_block_start, const void* data, size
bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
uint64_t old_block, uint16_t offset, CowOperationType type) {
if (compression_.algorithm != kCowCompressNone && compressor_ == nullptr) {
LOG(ERROR) << "Compression algorithm is " << compression_.algorithm
<< " but compressor is uninitialized.";
return false;
}
const size_t num_blocks = (size / header_.block_size);
if (compression_.algorithm == kCowCompressNone) {
std::vector<CowOperationV3> ops(num_blocks);
for (size_t i = 0; i < num_blocks; i++) {
CowOperation& op = ops[i];
op.new_block = new_block_start + i;
op.set_type(type);
if (type == kCowXorOp) {
op.set_source((old_block + i) * header_.block_size + offset);
} else {
op.set_source(next_data_pos_ + header_.block_size * i);
}
op.data_length = header_.block_size;
}
return WriteOperation({ops.data(), ops.size()},
{reinterpret_cast<const uint8_t*>(data), size});
}
const auto saved_op_count = header_.op_count;
const auto saved_data_pos = next_data_pos_;
for (size_t i = 0; i < num_blocks; i++) {
const uint8_t* const iter =
reinterpret_cast<const uint8_t*>(data) + (header_.block_size * i);
CowOperation op = {};
CowOperation op{};
op.new_block = new_block_start + i;
op.set_type(type);
@ -245,25 +277,21 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t
} else {
op.set_source(next_data_pos_);
}
std::basic_string<uint8_t> compressed_data;
const void* out_data = iter;
op.data_length = header_.block_size;
if (compression_.algorithm) {
if (!compressor_) {
PLOG(ERROR) << "Compressor not initialized";
return false;
}
compressed_data = compressor_->Compress(out_data, header_.block_size);
if (compressed_data.size() < op.data_length) {
out_data = compressed_data.data();
op.data_length = compressed_data.size();
}
const std::basic_string<uint8_t> compressed_data =
compressor_->Compress(out_data, header_.block_size);
if (compressed_data.size() < op.data_length) {
out_data = compressed_data.data();
op.data_length = compressed_data.size();
}
if (!WriteOperation(op, out_data, op.data_length)) {
PLOG(ERROR) << "AddRawBlocks with compression: write failed. new block: "
<< new_block_start << " compression: " << compression_.algorithm;
header_.op_count = saved_op_count;
next_data_pos_ = saved_data_pos;
return false;
}
}
@ -272,13 +300,14 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t
}
bool CowWriterV3::EmitZeroBlocks(uint64_t new_block_start, uint64_t num_blocks) {
std::vector<CowOperationV3> ops(num_blocks);
for (uint64_t i = 0; i < num_blocks; i++) {
CowOperationV3 op{};
CowOperationV3& op = ops[i];
op.set_type(kCowZeroOp);
op.new_block = new_block_start + i;
if (!WriteOperation(op)) {
return false;
}
}
if (!WriteOperation({ops.data(), ops.size()}, {})) {
return false;
}
return true;
}
@ -324,43 +353,48 @@ bool CowWriterV3::EmitSequenceData(size_t num_ops, const uint32_t* data) {
return true;
}
bool CowWriterV3::WriteOperation(const CowOperationV3& op, const void* data, size_t size) {
bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops,
std::basic_string_view<uint8_t> data) {
if (IsEstimating()) {
header_.op_count++;
header_.op_count += ops.size();
if (header_.op_count > header_.op_count_max) {
// If we increment op_count_max, the offset of data section would
// change. So need to update |next_data_pos_|
next_data_pos_ += (header_.op_count - header_.op_count_max) * sizeof(CowOperationV3);
header_.op_count_max = header_.op_count;
}
next_data_pos_ += op.data_length;
next_data_pos_ += data.size();
return true;
}
if (header_.op_count + 1 > header_.op_count_max) {
LOG(ERROR) << "Maximum number of ops reached: " << header_.op_count_max;
if (header_.op_count + ops.size() > header_.op_count_max) {
LOG(ERROR) << "Current op count " << header_.op_count << ", attempting to write "
<< ops.size() << " ops will exceed the max of " << header_.op_count_max;
return false;
}
const off_t offset = GetOpOffset(header_.op_count, header_);
if (!android::base::WriteFullyAtOffset(fd_, &op, sizeof(op), offset)) {
PLOG(ERROR) << "write failed for " << op << " at " << offset;
if (!android::base::WriteFullyAtOffset(fd_, ops.data(), ops.size() * sizeof(ops[0]), offset)) {
PLOG(ERROR) << "Write failed for " << ops.size() << " ops at " << offset;
return false;
}
if (data && size > 0) {
if (!android::base::WriteFullyAtOffset(fd_, data, size, next_data_pos_)) {
PLOG(ERROR) << "write failed for data of size: " << size
if (!data.empty()) {
if (!android::base::WriteFullyAtOffset(fd_, data.data(), data.size(), next_data_pos_)) {
PLOG(ERROR) << "write failed for data of size: " << data.size()
<< " at offset: " << next_data_pos_;
return false;
}
}
header_.op_count++;
next_data_pos_ += op.data_length;
next_op_pos_ += sizeof(CowOperationV3);
header_.op_count += ops.size();
next_data_pos_ += data.size();
return true;
}
bool CowWriterV3::WriteOperation(const CowOperationV3& op, const void* data, size_t size) {
return WriteOperation({&op, 1}, {reinterpret_cast<const uint8_t*>(data), size});
}
bool CowWriterV3::Finalize() {
CHECK_GE(header_.prefix.header_size, sizeof(CowHeaderV3));
CHECK_LE(header_.prefix.header_size, sizeof(header_));

View file

@ -15,6 +15,7 @@
#pragma once
#include <android-base/logging.h>
#include <string_view>
#include "writer_base.h"
@ -44,6 +45,8 @@ class CowWriterV3 : public CowWriterBase {
bool ParseOptions();
bool OpenForWrite();
bool OpenForAppend(uint64_t label);
bool WriteOperation(std::basic_string_view<CowOperationV3> op,
std::basic_string_view<uint8_t> data);
bool WriteOperation(const CowOperationV3& op, const void* data = nullptr, size_t size = 0);
bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
uint16_t offset, CowOperationType type);
@ -59,7 +62,6 @@ class CowWriterV3 : public CowWriterBase {
// Resume points contain a laebl + cow_op_index.
std::shared_ptr<std::vector<ResumePoint>> resume_points_;
uint64_t next_op_pos_ = 0;
uint64_t next_data_pos_ = 0;
std::vector<std::basic_string<uint8_t>> compressed_buf_;