Merge "libsnapshot: add sequence data" into main

This commit is contained in:
Daniel Zheng 2023-11-21 22:31:57 +00:00 committed by Gerrit Code Review
commit 1056a1172a
5 changed files with 227 additions and 63 deletions

View file

@ -107,9 +107,9 @@ static constexpr uint8_t kNumResumePoints = 4;
struct CowHeaderV3 : public CowHeader {
// Number of sequence data stored (each of which is a 32 byte integer)
uint64_t sequence_data_count;
// number of currently written resume points
// Number of currently written resume points &&
uint32_t resume_point_count;
// Size, in bytes, of the CowResumePoint buffer.
// Number of max resume points that can be written
uint32_t resume_point_max;
// Number of CowOperationV3 structs in the operation buffer, currently and total
// region size.
@ -232,19 +232,23 @@ static inline uint64_t GetCowOpSourceInfoData(const CowOperation& op) {
return op.source_info & kCowOpSourceInfoDataMask;
}
static constexpr off_t GetOpOffset(uint32_t op_index, const CowHeaderV3 header) {
return header.prefix.header_size + header.buffer_size +
(header.resume_point_max * sizeof(ResumePoint)) + (op_index * sizeof(CowOperationV3));
}
static constexpr off_t GetDataOffset(const CowHeaderV3 header) {
return header.prefix.header_size + header.buffer_size +
(header.resume_point_max * sizeof(ResumePoint)) +
header.op_count_max * sizeof(CowOperation);
}
static constexpr off_t GetResumeOffset(const CowHeaderV3 header) {
static constexpr off_t GetSequenceOffset(const CowHeaderV3& header) {
return header.prefix.header_size + header.buffer_size;
}
static constexpr off_t GetResumeOffset(const CowHeaderV3& header) {
return GetSequenceOffset(header) + (header.sequence_data_count * sizeof(uint32_t));
}
static constexpr off_t GetOpOffset(uint32_t op_index, const CowHeaderV3& header) {
return GetResumeOffset(header) + (header.resume_point_max * sizeof(ResumePoint)) +
(op_index * sizeof(CowOperationV3));
}
static constexpr off_t GetDataOffset(const CowHeaderV3& header) {
return GetOpOffset(header.op_count_max, header);
}
struct CowFooter {
CowFooterOperation op;
uint8_t unused[64];

View file

@ -16,8 +16,6 @@
#include <stdint.h>
#include <deque>
#include <functional>
#include <memory>
#include <optional>
#include <unordered_map>
@ -169,6 +167,12 @@ class CowReader final : public ICowReader {
private:
bool ParseV2(android::base::borrowed_fd fd, std::optional<uint64_t> label);
bool PrepMergeOps();
// sequence data is stored as an operation with actual data residing in the data offset.
bool GetSequenceDataV2(std::vector<uint32_t>* merge_op_blocks, std::vector<int>* other_ops,
std::unordered_map<uint32_t, int>* block_map);
// v3 of the cow writes sequence data within its own separate sequence buffer.
bool GetSequenceData(std::vector<uint32_t>* merge_op_blocks, std::vector<int>* other_ops,
std::unordered_map<uint32_t, int>* block_map);
uint64_t FindNumCopyops();
uint8_t GetCompressionType();

View file

@ -24,6 +24,7 @@
#include <android-base/file.h>
#include <android-base/logging.h>
#include <libsnapshot/cow_format.h>
#include <libsnapshot/cow_reader.h>
#include <zlib.h>
@ -265,52 +266,31 @@ bool CowReader::Parse(android::base::borrowed_fd fd, std::optional<uint64_t> lab
// Replace-op-4, Zero-op-9, Replace-op-5 }
//==============================================================
bool CowReader::PrepMergeOps() {
auto merge_op_blocks = std::make_unique<std::vector<uint32_t>>();
std::vector<int> other_ops;
auto seq_ops_set = std::unordered_set<uint32_t>();
auto block_map = std::make_unique<std::unordered_map<uint32_t, int>>();
size_t num_seqs = 0;
size_t read;
std::vector<uint32_t> merge_op_blocks;
std::unordered_map<uint32_t, int> block_map;
for (size_t i = 0; i < ops_->size(); i++) {
auto& current_op = ops_->data()[i];
if (current_op.type == kCowSequenceOp) {
size_t seq_len = current_op.data_length / sizeof(uint32_t);
merge_op_blocks->resize(merge_op_blocks->size() + seq_len);
if (!GetRawBytes(&current_op, &merge_op_blocks->data()[num_seqs],
current_op.data_length, &read)) {
PLOG(ERROR) << "Failed to read sequence op!";
return false;
}
for (size_t j = num_seqs; j < num_seqs + seq_len; j++) {
seq_ops_set.insert(merge_op_blocks->data()[j]);
}
num_seqs += seq_len;
}
if (IsMetadataOp(current_op)) {
continue;
}
// Sequence ops must be the first ops in the stream.
if (seq_ops_set.empty() && IsOrderedOp(current_op)) {
merge_op_blocks->emplace_back(current_op.new_block);
} else if (seq_ops_set.count(current_op.new_block) == 0) {
other_ops.push_back(current_op.new_block);
}
block_map->insert({current_op.new_block, i});
switch (header_.prefix.major_version) {
case 1:
case 2:
GetSequenceDataV2(&merge_op_blocks, &other_ops, &block_map);
break;
case 3:
GetSequenceData(&merge_op_blocks, &other_ops, &block_map);
break;
default:
break;
}
for (auto block : *merge_op_blocks) {
if (block_map->count(block) == 0) {
for (auto block : merge_op_blocks) {
if (block_map.count(block) == 0) {
LOG(ERROR) << "Invalid Sequence Ops. Could not find Cow Op for new block " << block;
return false;
}
}
if (merge_op_blocks->size() > header_.num_merge_ops) {
num_ordered_ops_to_merge_ = merge_op_blocks->size() - header_.num_merge_ops;
if (merge_op_blocks.size() > header_.num_merge_ops) {
num_ordered_ops_to_merge_ = merge_op_blocks.size() - header_.num_merge_ops;
} else {
num_ordered_ops_to_merge_ = 0;
}
@ -326,9 +306,9 @@ bool CowReader::PrepMergeOps() {
std::sort(other_ops.begin(), other_ops.end(), std::greater<int>());
}
merge_op_blocks->insert(merge_op_blocks->end(), other_ops.begin(), other_ops.end());
merge_op_blocks.insert(merge_op_blocks.end(), other_ops.begin(), other_ops.end());
num_total_data_ops_ = merge_op_blocks->size();
num_total_data_ops_ = merge_op_blocks.size();
if (header_.num_merge_ops > 0) {
merge_op_start_ = header_.num_merge_ops;
}
@ -338,24 +318,94 @@ bool CowReader::PrepMergeOps() {
// the ops vector as required for merge operations.
auto merge_ops_buffer = std::make_shared<std::vector<CowOperation>>();
merge_ops_buffer->reserve(num_total_data_ops_);
for (auto block : *merge_op_blocks) {
merge_ops_buffer->emplace_back(ops_->data()[block_map->at(block)]);
for (auto block : merge_op_blocks) {
merge_ops_buffer->emplace_back(ops_->data()[block_map.at(block)]);
}
ops_->clear();
ops_ = merge_ops_buffer;
ops_->shrink_to_fit();
} else {
for (auto block : *merge_op_blocks) {
block_pos_index_->push_back(block_map->at(block));
for (auto block : merge_op_blocks) {
block_pos_index_->push_back(block_map.at(block));
}
}
block_map->clear();
merge_op_blocks->clear();
block_map.clear();
merge_op_blocks.clear();
return true;
}
bool CowReader::GetSequenceDataV2(std::vector<uint32_t>* merge_op_blocks,
std::vector<int>* other_ops,
std::unordered_map<uint32_t, int>* block_map) {
auto seq_ops_set = std::unordered_set<uint32_t>();
size_t num_seqs = 0;
size_t read;
for (size_t i = 0; i < ops_->size(); i++) {
auto& current_op = ops_->data()[i];
if (current_op.type == kCowSequenceOp) {
size_t seq_len = current_op.data_length / sizeof(uint32_t);
merge_op_blocks->resize(merge_op_blocks->size() + seq_len);
if (!GetRawBytes(&current_op, &merge_op_blocks->data()[num_seqs],
current_op.data_length, &read)) {
PLOG(ERROR) << "Failed to read sequence op!";
return false;
}
for (size_t j = num_seqs; j < num_seqs + seq_len; j++) {
seq_ops_set.insert(merge_op_blocks->at(j));
}
num_seqs += seq_len;
}
if (IsMetadataOp(current_op)) {
continue;
}
// Sequence ops must be the first ops in the stream.
if (seq_ops_set.empty() && IsOrderedOp(current_op)) {
merge_op_blocks->emplace_back(current_op.new_block);
} else if (seq_ops_set.count(current_op.new_block) == 0) {
other_ops->push_back(current_op.new_block);
}
block_map->insert({current_op.new_block, i});
}
return false;
}
bool CowReader::GetSequenceData(std::vector<uint32_t>* merge_op_blocks, std::vector<int>* other_ops,
std::unordered_map<uint32_t, int>* block_map) {
std::unordered_set<uint32_t> seq_ops_set;
// read sequence ops data
merge_op_blocks->resize(header_.sequence_data_count);
if (!android::base::ReadFullyAtOffset(
fd_, merge_op_blocks->data(),
header_.sequence_data_count * sizeof(merge_op_blocks->at(0)),
GetSequenceOffset(header_))) {
PLOG(ERROR) << "failed to read sequence buffer. seq_data_count: "
<< header_.sequence_data_count << " at offset: " << GetSequenceOffset(header_);
return false;
}
seq_ops_set.reserve(merge_op_blocks->size());
for (auto& i : *merge_op_blocks) {
seq_ops_set.insert(i);
}
// read ordered op data
for (size_t i = 0; i < ops_->size(); i++) {
auto& current_op = ops_->data()[i];
// Sequence ops must be the first ops in the stream.
if (seq_ops_set.empty()) {
merge_op_blocks->emplace_back(current_op.new_block);
} else if (seq_ops_set.count(current_op.new_block) == 0) {
other_ops->push_back(current_op.new_block);
}
block_map->insert({current_op.new_block, i});
}
return true;
}
bool CowReader::VerifyMergeOps() {
auto itr = GetMergeOpIter(true);
std::unordered_map<uint64_t, const CowOperation*> overwritten_blocks;

View file

@ -520,5 +520,106 @@ TEST_F(CowTestV3, BufferMetadataSyncTest) {
resume_point_max = 4;
*/
}
TEST_F(CowTestV3, SequenceTest) {
CowOptions options;
options.op_count_max = std::numeric_limits<uint32_t>::max();
auto writer = CreateCowWriter(3, options, GetCowFd());
// sequence data. This just an arbitrary set of integers that specify the merge order. The
// actual calculation is done by update_engine and passed to writer. All we care about here is
// writing that data correctly
const int seq_len = std::numeric_limits<uint16_t>::max() / sizeof(uint32_t) + 1;
uint32_t sequence[seq_len];
for (int i = 0; i < seq_len; i++) {
sequence[i] = i + 1;
}
ASSERT_TRUE(writer->AddSequenceData(seq_len, sequence));
ASSERT_TRUE(writer->AddZeroBlocks(1, seq_len));
ASSERT_TRUE(writer->Finalize());
ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
CowReader reader;
ASSERT_TRUE(reader.Parse(cow_->fd));
auto iter = reader.GetRevMergeOpIter();
for (int i = 0; i < seq_len; i++) {
ASSERT_TRUE(!iter->AtEnd());
const auto& op = iter->Get();
ASSERT_EQ(op->new_block, seq_len - i);
iter->Next();
}
ASSERT_TRUE(iter->AtEnd());
}
TEST_F(CowTestV3, MissingSeqOp) {
CowOptions options;
options.op_count_max = std::numeric_limits<uint32_t>::max();
auto writer = CreateCowWriter(3, options, GetCowFd());
const int seq_len = 10;
uint32_t sequence[seq_len];
for (int i = 0; i < seq_len; i++) {
sequence[i] = i + 1;
}
ASSERT_TRUE(writer->AddSequenceData(seq_len, sequence));
ASSERT_TRUE(writer->AddZeroBlocks(1, seq_len - 1));
ASSERT_TRUE(writer->Finalize());
ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
CowReader reader;
ASSERT_FALSE(reader.Parse(cow_->fd));
}
TEST_F(CowTestV3, ResumeSeqOp) {
CowOptions options;
options.op_count_max = std::numeric_limits<uint32_t>::max();
auto writer = std::make_unique<CowWriterV3>(options, GetCowFd());
const int seq_len = 10;
uint32_t sequence[seq_len];
for (int i = 0; i < seq_len; i++) {
sequence[i] = i + 1;
}
ASSERT_TRUE(writer->Initialize());
ASSERT_TRUE(writer->AddSequenceData(seq_len, sequence));
ASSERT_TRUE(writer->AddZeroBlocks(1, seq_len / 2));
ASSERT_TRUE(writer->AddLabel(1));
ASSERT_TRUE(writer->AddZeroBlocks(1 + seq_len / 2, 1));
ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
auto reader = std::make_unique<CowReader>();
ASSERT_TRUE(reader->Parse(cow_->fd, 1));
auto itr = reader->GetRevMergeOpIter();
ASSERT_TRUE(itr->AtEnd());
writer = std::make_unique<CowWriterV3>(options, GetCowFd());
ASSERT_TRUE(writer->Initialize({1}));
ASSERT_TRUE(writer->AddZeroBlocks(1 + seq_len / 2, seq_len / 2));
ASSERT_TRUE(writer->Finalize());
ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
reader = std::make_unique<CowReader>();
ASSERT_TRUE(reader->Parse(cow_->fd));
auto iter = reader->GetRevMergeOpIter();
uint64_t expected_block = 10;
while (!iter->AtEnd() && expected_block > 0) {
ASSERT_FALSE(iter->AtEnd());
const auto& op = iter->Get();
ASSERT_EQ(op->new_block, expected_block);
iter->Next();
expected_block--;
}
ASSERT_EQ(expected_block, 0);
ASSERT_TRUE(iter->AtEnd());
}
} // namespace snapshot
} // namespace android

View file

@ -316,9 +316,14 @@ bool CowWriterV3::EmitLabel(uint64_t label) {
}
bool CowWriterV3::EmitSequenceData(size_t num_ops, const uint32_t* data) {
LOG(ERROR) << __LINE__ << " " << __FILE__ << " <- function here should never be called";
if (num_ops && data) return false;
return false;
// TODO: size sequence buffer based on options
header_.sequence_data_count = num_ops;
if (!android::base::WriteFullyAtOffset(fd_, data, sizeof(data[0]) * num_ops,
GetSequenceOffset(header_))) {
PLOG(ERROR) << "writing sequence buffer failed";
return false;
}
return true;
}
bool CowWriterV3::WriteOperation(const CowOperationV3& op, const void* data, size_t size) {