Merge "Refactor off V2 Cow Ops" into main

This commit is contained in:
Daniel Zheng 2023-10-05 19:12:56 +00:00 committed by Gerrit Code Review
commit adad3dbefe
9 changed files with 219 additions and 149 deletions

View file

@ -22,6 +22,9 @@
namespace android {
namespace snapshot {
struct CowOperationV3;
typedef CowOperationV3 CowOperation;
static constexpr uint64_t kCowMagicNumber = 0x436f77634f572121ULL;
static constexpr uint32_t kCowVersionMajor = 2;
static constexpr uint32_t kCowVersionMinor = 0;
@ -109,43 +112,7 @@ struct CowFooterOperation {
uint64_t num_ops;
} __attribute__((packed));
// Cow operations are currently fixed-size entries, but this may change if
// needed.
struct CowOperation {
// The operation code (see the constants and structures below).
uint8_t type;
// If this operation reads from the data section of the COW, this contains
// the compression type of that data (see constants below).
uint8_t compression;
// If this operation reads from the data section of the COW, this contains
// the length.
uint16_t data_length;
// The block of data in the new image that this operation modifies.
uint64_t new_block;
// The value of |source| depends on the operation code.
//
// For copy operations, this is a block location in the source image.
//
// For replace operations, this is a byte offset within the COW's data
// sections (eg, not landing within the header or metadata). It is an
// absolute position within the image.
//
// For zero operations (replace with all zeroes), this is unused and must
// be zero.
//
// For Label operations, this is the value of the applied label.
//
// For Cluster operations, this is the length of the following data region
//
// For Xor operations, this is the byte location in the source image.
uint64_t source;
} __attribute__((packed));
// The on disk format of cow (currently == CowOperation)
// V2 version of COW. On disk format for older devices
struct CowOperationV2 {
// The operation code (see the constants and structures below).
uint8_t type;
@ -180,8 +147,33 @@ struct CowOperationV2 {
uint64_t source;
} __attribute__((packed));
static_assert(sizeof(CowOperationV2) == sizeof(CowOperation));
static_assert(sizeof(CowOperation) == sizeof(CowFooterOperation));
// The on disk format of cow (currently == CowOperation)
struct CowOperationV3 {
// The operation code (see the constants and structures below).
uint8_t type;
// If this operation reads from the data section of the COW, this contains
// the length.
uint16_t data_length;
// The block of data in the new image that this operation modifies.
uint32_t new_block;
// The value of |source| depends on the operation code.
//
// CopyOp: a 32-bit block location in the source image.
// ReplaceOp: an absolute byte offset within the COW's data section.
// XorOp: an absolute byte offset in the source image.
// ZeroOp: unused
// LabelOp: a 64-bit opaque identifier.
//
// For ops other than Label:
// Bits 47-62 are reserved and must be zero.
// A block is compressed if its data is < block_sz
uint64_t source_info;
} __attribute__((packed));
static_assert(sizeof(CowOperationV2) == sizeof(CowFooterOperation));
static constexpr uint8_t kCowCopyOp = 1;
static constexpr uint8_t kCowReplaceOp = 2;
@ -208,11 +200,14 @@ static constexpr uint8_t kCowReadAheadNotStarted = 0;
static constexpr uint8_t kCowReadAheadInProgress = 1;
static constexpr uint8_t kCowReadAheadDone = 2;
static constexpr uint64_t kCowOpSourceInfoDataMask = (1ULL << 48) - 1;
static constexpr uint64_t kCowOpSourceInfoCompressBit = (1ULL << 63);
static inline uint64_t GetCowOpSourceInfoData(const CowOperation* op) {
return op->source;
return op->source_info & kCowOpSourceInfoDataMask;
}
static inline bool GetCowOpSourceInfoCompression(const CowOperation* op) {
return op->compression != kCowCompressNone;
return !!(op->source_info & kCowOpSourceInfoCompressBit);
}
struct CowFooter {
@ -236,10 +231,12 @@ struct BufferState {
// 2MB Scratch space used for read-ahead
static constexpr uint64_t BUFFER_REGION_DEFAULT_SIZE = (1ULL << 21);
std::ostream& operator<<(std::ostream& os, CowOperationV2 const& arg);
std::ostream& operator<<(std::ostream& os, CowOperation const& arg);
int64_t GetNextOpOffset(const CowOperation& op, uint32_t cluster_size);
int64_t GetNextDataOffset(const CowOperation& op, uint32_t cluster_size);
int64_t GetNextOpOffset(const CowOperationV2& op, uint32_t cluster_size);
int64_t GetNextDataOffset(const CowOperationV2& op, uint32_t cluster_size);
// Ops that are internal to the Cow Format and not OTA data
bool IsMetadataOp(const CowOperation& op);

View file

@ -165,9 +165,10 @@ class CowReader final : public ICowReader {
void UpdateMergeOpsCompleted(int num_merge_ops) { header_.num_merge_ops += num_merge_ops; }
private:
bool ParseV2(android::base::borrowed_fd fd, std::optional<uint64_t> label);
bool PrepMergeOps();
uint64_t FindNumCopyops();
uint8_t GetCompressionType(const CowOperation* op);
uint8_t GetCompressionType();
android::base::unique_fd owned_fd_;
android::base::borrowed_fd fd_;
@ -184,6 +185,7 @@ class CowReader final : public ICowReader {
std::shared_ptr<std::unordered_map<uint64_t, uint64_t>> data_loc_;
ReaderFlags reader_flag_;
bool is_merge_{};
uint8_t compression_type_ = kCowCompressNone;
};
} // namespace snapshot

View file

@ -14,11 +14,14 @@
// limitations under the License.
//
#include <inttypes.h>
#include <libsnapshot/cow_format.h>
#include <sys/types.h>
#include <unistd.h>
#include <android-base/logging.h>
#include <android-base/stringprintf.h>
#include <libsnapshot/cow_format.h>
#include "writer_v2.h"
namespace android {
@ -26,45 +29,82 @@ namespace snapshot {
using android::base::unique_fd;
std::ostream& operator<<(std::ostream& os, CowOperation const& op) {
os << "CowOperation(type:";
if (op.type == kCowCopyOp)
os << "kCowCopyOp, ";
else if (op.type == kCowReplaceOp)
os << "kCowReplaceOp, ";
else if (op.type == kCowZeroOp)
os << "kZeroOp, ";
else if (op.type == kCowFooterOp)
os << "kCowFooterOp, ";
else if (op.type == kCowLabelOp)
os << "kCowLabelOp, ";
else if (op.type == kCowClusterOp)
os << "kCowClusterOp ";
else if (op.type == kCowXorOp)
os << "kCowXorOp ";
else if (op.type == kCowSequenceOp)
os << "kCowSequenceOp ";
else if (op.type == kCowFooterOp)
os << "kCowFooterOp ";
else
os << (int)op.type << "?,";
os << "compression:";
if (op.compression == kCowCompressNone)
os << "kCowCompressNone, ";
else if (op.compression == kCowCompressGz)
os << "kCowCompressGz, ";
else if (op.compression == kCowCompressBrotli)
os << "kCowCompressBrotli, ";
else
os << (int)op.compression << "?, ";
os << "data_length:" << op.data_length << ",\t";
os << "new_block:" << op.new_block << ",\t";
std::ostream& EmitCowTypeString(std::ostream& os, uint8_t cow_type) {
switch (cow_type) {
case kCowCopyOp:
return os << "kCowCopyOp";
case kCowReplaceOp:
return os << "kCowReplaceOp";
case kCowZeroOp:
return os << "kZeroOp";
case kCowFooterOp:
return os << "kCowFooterOp";
case kCowLabelOp:
return os << "kCowLabelOp";
case kCowClusterOp:
return os << "kCowClusterOp";
case kCowXorOp:
return os << "kCowXorOp";
case kCowSequenceOp:
return os << "kCowSequenceOp";
default:
return os << (int)cow_type << "unknown";
}
}
std::ostream& operator<<(std::ostream& os, CowOperationV2 const& op) {
os << "CowOperationV2(";
EmitCowTypeString(os, op.type) << ", ";
switch (op.compression) {
case kCowCompressNone:
os << "uncompressed, ";
break;
case kCowCompressGz:
os << "gz, ";
break;
case kCowCompressBrotli:
os << "brotli, ";
break;
case kCowCompressLz4:
os << "lz4, ";
break;
case kCowCompressZstd:
os << "zstd, ";
break;
}
os << "data_length:" << op.data_length << ", ";
os << "new_block:" << op.new_block << ", ";
os << "source:" << op.source;
os << ")";
return os;
}
int64_t GetNextOpOffset(const CowOperation& op, uint32_t cluster_ops) {
std::ostream& operator<<(std::ostream& os, CowOperation const& op) {
os << "CowOperation(";
EmitCowTypeString(os, op.type);
if (op.type == kCowReplaceOp || op.type == kCowXorOp || op.type == kCowSequenceOp) {
if (op.source_info & kCowOpSourceInfoCompressBit) {
os << ", compressed";
} else {
os << ", uncompressed";
}
os << ", data_length:" << op.data_length;
}
if (op.type != kCowClusterOp && op.type != kCowSequenceOp && op.type != kCowLabelOp) {
os << ", new_block:" << op.new_block;
}
if (op.type == kCowXorOp || op.type == kCowReplaceOp || op.type == kCowCopyOp) {
os << ", source:" << (op.source_info & kCowOpSourceInfoDataMask);
} else if (op.type == kCowClusterOp) {
os << ", cluster_data:" << (op.source_info & kCowOpSourceInfoDataMask);
} else {
os << ", label:0x" << android::base::StringPrintf("%" PRIx64, op.source_info);
}
os << ")";
return os;
}
int64_t GetNextOpOffset(const CowOperationV2& op, uint32_t cluster_ops) {
if (op.type == kCowClusterOp) {
return op.source;
} else if ((op.type == kCowReplaceOp || op.type == kCowXorOp) && cluster_ops == 0) {
@ -74,11 +114,11 @@ int64_t GetNextOpOffset(const CowOperation& op, uint32_t cluster_ops) {
}
}
int64_t GetNextDataOffset(const CowOperation& op, uint32_t cluster_ops) {
int64_t GetNextDataOffset(const CowOperationV2& op, uint32_t cluster_ops) {
if (op.type == kCowClusterOp) {
return cluster_ops * sizeof(CowOperation);
return cluster_ops * sizeof(CowOperationV2);
} else if (cluster_ops == 0) {
return sizeof(CowOperation);
return sizeof(CowOperationV2);
} else {
return 0;
}

View file

@ -55,6 +55,7 @@ std::unique_ptr<CowReader> CowReader::CloneCowReader() {
cow->data_loc_ = data_loc_;
cow->block_pos_index_ = block_pos_index_;
cow->is_merge_ = is_merge_;
cow->compression_type_ = compression_type_;
return cow;
}
@ -101,8 +102,44 @@ bool CowReader::Parse(android::base::borrowed_fd fd, std::optional<uint64_t> lab
footer_ = parser.footer();
fd_size_ = parser.fd_size();
last_label_ = parser.last_label();
ops_ = parser.ops();
data_loc_ = parser.data_loc();
ops_ = std::make_shared<std::vector<CowOperation>>(parser.ops()->size());
// Translate the operation buffer from on disk to in memory
for (size_t i = 0; i < parser.ops()->size(); i++) {
const auto& v2_op = parser.ops()->at(i);
auto& new_op = ops_->at(i);
new_op.type = v2_op.type;
new_op.data_length = v2_op.data_length;
if (v2_op.new_block > std::numeric_limits<uint32_t>::max()) {
LOG(ERROR) << "Out-of-range new block in COW op: " << v2_op;
return false;
}
new_op.new_block = v2_op.new_block;
uint64_t source_info = v2_op.source;
if (new_op.type != kCowLabelOp) {
source_info &= kCowOpSourceInfoDataMask;
if (source_info != v2_op.source) {
LOG(ERROR) << "Out-of-range source value in COW op: " << v2_op;
return false;
}
}
if (v2_op.compression != kCowCompressNone) {
if (compression_type_ == kCowCompressNone) {
compression_type_ = v2_op.compression;
} else if (compression_type_ != v2_op.compression) {
LOG(ERROR) << "COW has mixed compression types which is not supported;"
<< " previously saw " << compression_type_ << ", got "
<< v2_op.compression << ", op: " << v2_op;
return false;
}
source_info |= kCowOpSourceInfoCompressBit;
}
new_op.source_info = source_info;
}
// If we're resuming a write, we're not ready to merge
if (label.has_value()) return true;
@ -597,14 +634,14 @@ class CowDataStream final : public IByteStream {
size_t remaining_;
};
uint8_t CowReader::GetCompressionType(const CowOperation* op) {
return op->compression;
uint8_t CowReader::GetCompressionType() {
return compression_type_;
}
ssize_t CowReader::ReadData(const CowOperation* op, void* buffer, size_t buffer_size,
size_t ignore_bytes) {
std::unique_ptr<IDecompressor> decompressor;
switch (GetCompressionType(op)) {
switch (GetCompressionType()) {
case kCowCompressNone:
break;
case kCowCompressGz:
@ -624,7 +661,7 @@ ssize_t CowReader::ReadData(const CowOperation* op, void* buffer, size_t buffer_
}
break;
default:
LOG(ERROR) << "Unknown compression type: " << GetCompressionType(op);
LOG(ERROR) << "Unknown compression type: " << GetCompressionType();
return -1;
}

View file

@ -66,18 +66,18 @@ bool CowParserV2::Parse(borrowed_fd fd, const CowHeader& header, std::optional<u
<< sizeof(CowFooter);
return false;
}
if (header_.op_size != sizeof(CowOperation)) {
if (header_.op_size != sizeof(CowOperationV2)) {
LOG(ERROR) << "Operation size unknown, read " << header_.op_size << ", expected "
<< sizeof(CowOperation);
<< sizeof(CowOperationV2);
return false;
}
if (header_.cluster_ops == 1) {
LOG(ERROR) << "Clusters must contain at least two operations to function.";
return false;
}
if (header_.op_size != sizeof(CowOperation)) {
if (header_.op_size != sizeof(CowOperationV2)) {
LOG(ERROR) << "Operation size unknown, read " << header_.op_size << ", expected "
<< sizeof(CowOperation);
<< sizeof(CowOperationV2);
return false;
}
if (header_.cluster_ops == 1) {
@ -123,23 +123,23 @@ bool CowParserV2::ParseOps(borrowed_fd fd, std::optional<uint64_t> label) {
uint64_t data_pos = 0;
if (header_.cluster_ops) {
data_pos = pos + header_.cluster_ops * sizeof(CowOperation);
data_pos = pos + header_.cluster_ops * sizeof(CowOperationV2);
} else {
data_pos = pos + sizeof(CowOperation);
data_pos = pos + sizeof(CowOperationV2);
}
auto ops_buffer = std::make_shared<std::vector<CowOperation>>();
auto ops_buffer = std::make_shared<std::vector<CowOperationV2>>();
uint64_t current_op_num = 0;
uint64_t cluster_ops = header_.cluster_ops ?: 1;
bool done = false;
// Alternating op clusters and data
while (!done) {
uint64_t to_add = std::min(cluster_ops, (fd_size_ - pos) / sizeof(CowOperation));
uint64_t to_add = std::min(cluster_ops, (fd_size_ - pos) / sizeof(CowOperationV2));
if (to_add == 0) break;
ops_buffer->resize(current_op_num + to_add);
if (!android::base::ReadFully(fd, &ops_buffer->data()[current_op_num],
to_add * sizeof(CowOperation))) {
to_add * sizeof(CowOperationV2))) {
PLOG(ERROR) << "read op failed";
return false;
}
@ -150,7 +150,7 @@ bool CowParserV2::ParseOps(borrowed_fd fd, std::optional<uint64_t> label) {
if (current_op.type == kCowXorOp) {
data_loc->insert({current_op.new_block, data_pos});
}
pos += sizeof(CowOperation) + GetNextOpOffset(current_op, header_.cluster_ops);
pos += sizeof(CowOperationV2) + GetNextOpOffset(current_op, header_.cluster_ops);
data_pos += current_op.data_length + GetNextDataOffset(current_op, header_.cluster_ops);
if (current_op.type == kCowClusterOp) {
@ -222,7 +222,7 @@ bool CowParserV2::ParseOps(borrowed_fd fd, std::optional<uint64_t> label) {
<< ops_buffer->size();
return false;
}
if (ops_buffer->size() * sizeof(CowOperation) != footer_->op.ops_size) {
if (ops_buffer->size() * sizeof(CowOperationV2) != footer_->op.ops_size) {
LOG(ERROR) << "ops size does not match ";
return false;
}

View file

@ -33,7 +33,7 @@ class CowParserV2 {
const CowHeader& header() const { return header_; }
const std::optional<CowFooter>& footer() const { return footer_; }
std::shared_ptr<std::vector<CowOperation>> ops() { return ops_; }
std::shared_ptr<std::vector<CowOperationV2>> ops() { return ops_; }
std::shared_ptr<std::unordered_map<uint64_t, uint64_t>> data_loc() const { return data_loc_; }
uint64_t fd_size() const { return fd_size_; }
const std::optional<uint64_t>& last_label() const { return last_label_; }
@ -43,7 +43,7 @@ class CowParserV2 {
CowHeader header_ = {};
std::optional<CowFooter> footer_;
std::shared_ptr<std::vector<CowOperation>> ops_;
std::shared_ptr<std::vector<CowOperationV2>> ops_;
std::shared_ptr<std::unordered_map<uint64_t, uint64_t>> data_loc_;
uint64_t fd_size_;
std::optional<uint64_t> last_label_;

View file

@ -86,10 +86,9 @@ TEST_F(CowTest, CopyContiguous) {
while (!iter->AtEnd()) {
auto op = iter->Get();
ASSERT_EQ(op->type, kCowCopyOp);
ASSERT_EQ(op->compression, kCowCompressNone);
ASSERT_EQ(op->data_length, 0);
ASSERT_EQ(op->new_block, 10 + i);
ASSERT_EQ(op->source, 1000 + i);
ASSERT_EQ(op->source_info, 1000 + i);
iter->Next();
i += 1;
}
@ -133,10 +132,9 @@ TEST_F(CowTest, ReadWrite) {
auto op = iter->Get();
ASSERT_EQ(op->type, kCowCopyOp);
ASSERT_EQ(op->compression, kCowCompressNone);
ASSERT_EQ(op->data_length, 0);
ASSERT_EQ(op->new_block, 10);
ASSERT_EQ(op->source, 20);
ASSERT_EQ(op->source_info, 20);
std::string sink(data.size(), '\0');
@ -157,20 +155,18 @@ TEST_F(CowTest, ReadWrite) {
// Note: the zero operation gets split into two blocks.
ASSERT_EQ(op->type, kCowZeroOp);
ASSERT_EQ(op->compression, kCowCompressNone);
ASSERT_EQ(op->data_length, 0);
ASSERT_EQ(op->new_block, 51);
ASSERT_EQ(op->source, 0);
ASSERT_EQ(op->source_info, 0);
iter->Next();
ASSERT_FALSE(iter->AtEnd());
op = iter->Get();
ASSERT_EQ(op->type, kCowZeroOp);
ASSERT_EQ(op->compression, kCowCompressNone);
ASSERT_EQ(op->data_length, 0);
ASSERT_EQ(op->new_block, 52);
ASSERT_EQ(op->source, 0);
ASSERT_EQ(op->source_info, 0);
iter->Next();
ASSERT_TRUE(iter->AtEnd());
@ -212,10 +208,9 @@ TEST_F(CowTest, ReadWriteXor) {
auto op = iter->Get();
ASSERT_EQ(op->type, kCowCopyOp);
ASSERT_EQ(op->compression, kCowCompressNone);
ASSERT_EQ(op->data_length, 0);
ASSERT_EQ(op->new_block, 10);
ASSERT_EQ(op->source, 20);
ASSERT_EQ(op->source_info, 20);
std::string sink(data.size(), '\0');
@ -237,20 +232,18 @@ TEST_F(CowTest, ReadWriteXor) {
// Note: the zero operation gets split into two blocks.
ASSERT_EQ(op->type, kCowZeroOp);
ASSERT_EQ(op->compression, kCowCompressNone);
ASSERT_EQ(op->data_length, 0);
ASSERT_EQ(op->new_block, 51);
ASSERT_EQ(op->source, 0);
ASSERT_EQ(op->source_info, 0);
iter->Next();
ASSERT_FALSE(iter->AtEnd());
op = iter->Get();
ASSERT_EQ(op->type, kCowZeroOp);
ASSERT_EQ(op->compression, kCowCompressNone);
ASSERT_EQ(op->data_length, 0);
ASSERT_EQ(op->new_block, 52);
ASSERT_EQ(op->source, 0);
ASSERT_EQ(op->source_info, 0);
iter->Next();
ASSERT_TRUE(iter->AtEnd());
@ -677,7 +670,7 @@ TEST_F(CowTest, AppendLabelSmall) {
ASSERT_FALSE(iter->AtEnd());
op = iter->Get();
ASSERT_EQ(op->type, kCowLabelOp);
ASSERT_EQ(op->source, 3);
ASSERT_EQ(op->source_info, 3);
iter->Next();
@ -730,7 +723,7 @@ TEST_F(CowTest, AppendLabelMissing) {
ASSERT_FALSE(iter->AtEnd());
auto op = iter->Get();
ASSERT_EQ(op->type, kCowLabelOp);
ASSERT_EQ(op->source, 0);
ASSERT_EQ(op->source_info, 0);
iter->Next();
@ -788,7 +781,7 @@ TEST_F(CowTest, AppendExtendedCorrupted) {
ASSERT_FALSE(iter->AtEnd());
auto op = iter->Get();
ASSERT_EQ(op->type, kCowLabelOp);
ASSERT_EQ(op->source, 5);
ASSERT_EQ(op->source_info, 5);
iter->Next();
ASSERT_TRUE(iter->AtEnd());
@ -857,7 +850,7 @@ TEST_F(CowTest, AppendbyLabel) {
ASSERT_FALSE(iter->AtEnd());
op = iter->Get();
ASSERT_EQ(op->type, kCowLabelOp);
ASSERT_EQ(op->source, 4);
ASSERT_EQ(op->source_info, 4);
iter->Next();
@ -875,7 +868,7 @@ TEST_F(CowTest, AppendbyLabel) {
ASSERT_FALSE(iter->AtEnd());
op = iter->Get();
ASSERT_EQ(op->type, kCowLabelOp);
ASSERT_EQ(op->source, 5);
ASSERT_EQ(op->source_info, 5);
iter->Next();
@ -928,7 +921,7 @@ TEST_F(CowTest, ClusterTest) {
ASSERT_FALSE(iter->AtEnd());
op = iter->Get();
ASSERT_EQ(op->type, kCowLabelOp);
ASSERT_EQ(op->source, 4);
ASSERT_EQ(op->source_info, 4);
iter->Next();
@ -953,7 +946,7 @@ TEST_F(CowTest, ClusterTest) {
ASSERT_FALSE(iter->AtEnd());
op = iter->Get();
ASSERT_EQ(op->type, kCowLabelOp);
ASSERT_EQ(op->source, 5);
ASSERT_EQ(op->source_info, 5);
iter->Next();
@ -972,7 +965,7 @@ TEST_F(CowTest, ClusterTest) {
ASSERT_FALSE(iter->AtEnd());
op = iter->Get();
ASSERT_EQ(op->type, kCowLabelOp);
ASSERT_EQ(op->source, 6);
ASSERT_EQ(op->source_info, 6);
iter->Next();
@ -1019,7 +1012,7 @@ TEST_F(CowTest, ClusterAppendTest) {
ASSERT_FALSE(iter->AtEnd());
auto op = iter->Get();
ASSERT_EQ(op->type, kCowLabelOp);
ASSERT_EQ(op->source, 50);
ASSERT_EQ(op->source_info, 50);
iter->Next();

View file

@ -110,7 +110,7 @@ void CowWriterV2::SetupHeaders() {
header_.prefix.minor_version = kCowVersionMinor;
header_.prefix.header_size = sizeof(CowHeader);
header_.footer_size = sizeof(CowFooter);
header_.op_size = sizeof(CowOperation);
header_.op_size = sizeof(CowOperationV2);
header_.block_size = options_.block_size;
header_.num_merge_ops = options_.num_merge_ops;
header_.cluster_ops = options_.cluster_ops;
@ -159,9 +159,9 @@ void CowWriterV2::InitBatchWrites() {
struct iovec* cowop_ptr = cowop_vec_.get();
struct iovec* data_ptr = data_vec_.get();
for (size_t i = 0; i < header_.cluster_ops; i++) {
std::unique_ptr<CowOperation> op = std::make_unique<CowOperation>();
std::unique_ptr<CowOperationV2> op = std::make_unique<CowOperationV2>();
cowop_ptr[i].iov_base = op.get();
cowop_ptr[i].iov_len = sizeof(CowOperation);
cowop_ptr[i].iov_len = sizeof(CowOperationV2);
opbuffer_vec_.push_back(std::move(op));
std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(header_.block_size * 2);
@ -214,19 +214,19 @@ bool CowWriterV2::Initialize(std::optional<uint64_t> label) {
}
void CowWriterV2::InitPos() {
next_op_pos_ = sizeof(header_) + header_.buffer_size;
cluster_size_ = header_.cluster_ops * sizeof(CowOperation);
next_op_pos_ = sizeof(CowHeader) + header_.buffer_size;
cluster_size_ = header_.cluster_ops * sizeof(CowOperationV2);
if (header_.cluster_ops) {
next_data_pos_ = next_op_pos_ + cluster_size_;
} else {
next_data_pos_ = next_op_pos_ + sizeof(CowOperation);
next_data_pos_ = next_op_pos_ + sizeof(CowOperationV2);
}
current_cluster_size_ = 0;
current_data_size_ = 0;
}
bool CowWriterV2::OpenForWrite() {
// This limitation is tied to the data field size in CowOperation.
// This limitation is tied to the data field size in CowOperationV2.
if (header_.block_size > std::numeric_limits<uint16_t>::max()) {
LOG(ERROR) << "Block size is too large";
return false;
@ -313,7 +313,7 @@ bool CowWriterV2::EmitCopy(uint64_t new_block, uint64_t old_block, uint64_t num_
CHECK(!merge_in_progress_);
for (size_t i = 0; i < num_blocks; i++) {
CowOperation op = {};
CowOperationV2 op = {};
op.type = kCowCopyOp;
op.new_block = new_block + i;
op.source = old_block + i;
@ -399,7 +399,7 @@ bool CowWriterV2::EmitBlocks(uint64_t new_block_start, const void* data, size_t
num_blocks -= pending_blocks;
while (i < size / header_.block_size && pending_blocks) {
CowOperation op = {};
CowOperationV2 op = {};
op.new_block = new_block_start + i;
op.type = type;
if (type == kCowXorOp) {
@ -451,7 +451,7 @@ bool CowWriterV2::EmitBlocks(uint64_t new_block_start, const void* data, size_t
bool CowWriterV2::EmitZeroBlocks(uint64_t new_block_start, uint64_t num_blocks) {
CHECK(!merge_in_progress_);
for (uint64_t i = 0; i < num_blocks; i++) {
CowOperation op = {};
CowOperationV2 op = {};
op.type = kCowZeroOp;
op.new_block = new_block_start + i;
op.source = 0;
@ -462,7 +462,7 @@ bool CowWriterV2::EmitZeroBlocks(uint64_t new_block_start, uint64_t num_blocks)
bool CowWriterV2::EmitLabel(uint64_t label) {
CHECK(!merge_in_progress_);
CowOperation op = {};
CowOperationV2 op = {};
op.type = kCowLabelOp;
op.source = label;
return WriteOperation(op) && Sync();
@ -473,7 +473,7 @@ bool CowWriterV2::EmitSequenceData(size_t num_ops, const uint32_t* data) {
size_t to_add = 0;
size_t max_ops = (header_.block_size * 2) / sizeof(uint32_t);
while (num_ops > 0) {
CowOperation op = {};
CowOperationV2 op = {};
op.type = kCowSequenceOp;
op.source = next_data_pos_;
to_add = std::min(num_ops, max_ops);
@ -489,16 +489,16 @@ bool CowWriterV2::EmitSequenceData(size_t num_ops, const uint32_t* data) {
}
bool CowWriterV2::EmitCluster() {
CowOperation op = {};
CowOperationV2 op = {};
op.type = kCowClusterOp;
// Next cluster starts after remainder of current cluster and the next data block.
op.source = current_data_size_ + cluster_size_ - current_cluster_size_ - sizeof(CowOperation);
op.source = current_data_size_ + cluster_size_ - current_cluster_size_ - sizeof(CowOperationV2);
return WriteOperation(op);
}
bool CowWriterV2::EmitClusterIfNeeded() {
// If there isn't room for another op and the cluster end op, end the current cluster
if (cluster_size_ && cluster_size_ < current_cluster_size_ + 2 * sizeof(CowOperation)) {
if (cluster_size_ && cluster_size_ < current_cluster_size_ + 2 * sizeof(CowOperationV2)) {
if (!EmitCluster()) return false;
}
return true;
@ -539,7 +539,7 @@ bool CowWriterV2::Finalize() {
extra_cluster = true;
}
footer_.op.ops_size = footer_.op.num_ops * sizeof(CowOperation);
footer_.op.ops_size = footer_.op.num_ops * sizeof(CowOperationV2);
if (lseek(fd_.get(), next_op_pos_, SEEK_SET) < 0) {
PLOG(ERROR) << "Failed to seek to footer position.";
return false;
@ -611,9 +611,9 @@ bool CowWriterV2::FlushCluster() {
if (op_vec_index_) {
ret = pwritev(fd_.get(), cowop_vec_.get(), op_vec_index_, current_op_pos_);
if (ret != (op_vec_index_ * sizeof(CowOperation))) {
PLOG(ERROR) << "pwritev failed for CowOperation. Expected: "
<< (op_vec_index_ * sizeof(CowOperation));
if (ret != (op_vec_index_ * sizeof(CowOperationV2))) {
PLOG(ERROR) << "pwritev failed for CowOperationV2. Expected: "
<< (op_vec_index_ * sizeof(CowOperationV2));
return false;
}
}
@ -635,15 +635,16 @@ bool CowWriterV2::FlushCluster() {
return true;
}
bool CowWriterV2::WriteOperation(const CowOperation& op, const void* data, size_t size) {
bool CowWriterV2::WriteOperation(const CowOperationV2& op, const void* data, size_t size) {
if (!EnsureSpaceAvailable(next_op_pos_ + sizeof(op)) ||
!EnsureSpaceAvailable(next_data_pos_ + size)) {
return false;
}
if (batch_write_) {
CowOperation* cow_op = reinterpret_cast<CowOperation*>(cowop_vec_[op_vec_index_].iov_base);
std::memcpy(cow_op, &op, sizeof(CowOperation));
CowOperationV2* cow_op =
reinterpret_cast<CowOperationV2*>(cowop_vec_[op_vec_index_].iov_base);
std::memcpy(cow_op, &op, sizeof(CowOperationV2));
op_vec_index_ += 1;
if (data != nullptr && size > 0) {
@ -681,7 +682,7 @@ bool CowWriterV2::WriteOperation(const CowOperation& op, const void* data, size_
return EmitClusterIfNeeded();
}
void CowWriterV2::AddOperation(const CowOperation& op) {
void CowWriterV2::AddOperation(const CowOperationV2& op) {
footer_.op.num_ops++;
if (op.type == kCowClusterOp) {
@ -693,7 +694,7 @@ void CowWriterV2::AddOperation(const CowOperation& op) {
}
next_data_pos_ += op.data_length + GetNextDataOffset(op, header_.cluster_ops);
next_op_pos_ += sizeof(CowOperation) + GetNextOpOffset(op, header_.cluster_ops);
next_op_pos_ += sizeof(CowOperationV2) + GetNextOpOffset(op, header_.cluster_ops);
}
bool CowWriterV2::WriteRawData(const void* data, const size_t size) {

View file

@ -50,8 +50,8 @@ class CowWriterV2 : public CowWriterBase {
bool OpenForAppend(uint64_t label);
bool GetDataPos(uint64_t* pos);
bool WriteRawData(const void* data, size_t size);
bool WriteOperation(const CowOperation& op, const void* data = nullptr, size_t size = 0);
void AddOperation(const CowOperation& op);
bool WriteOperation(const CowOperationV2& op, const void* data = nullptr, size_t size = 0);
void AddOperation(const CowOperationV2& op);
void InitPos();
void InitBatchWrites();
void InitWorkers();
@ -84,7 +84,7 @@ class CowWriterV2 : public CowWriterBase {
std::vector<std::basic_string<uint8_t>> compressed_buf_;
std::vector<std::basic_string<uint8_t>>::iterator buf_iter_;
std::vector<std::unique_ptr<CowOperation>> opbuffer_vec_;
std::vector<std::unique_ptr<CowOperationV2>> opbuffer_vec_;
std::vector<std::unique_ptr<uint8_t[]>> databuffer_vec_;
std::unique_ptr<struct iovec[]> cowop_vec_;
int op_vec_index_ = 0;