snapuserd: Add support for Xor ops in snapuserd
This adds the ability to read the blocks corresponding to xor operations from snapuserd. Xor Operations should be treated the same as copy operations during the merge, but before their data is written to disk, it must be xor'ed against the operation's data. For the purposes of readahead, this acts as a copy op. Post readahead, it acts more like a replace op. Change-Id: I7d74bbdd43bfc5277ef4c8aade57dd375be9180a Bug: 177104308 Test: vts_libsnapshot_test and cow_snapuserd_test
This commit is contained in:
parent
35bbf11ec1
commit
e278cee742
7 changed files with 209 additions and 53 deletions
|
@ -221,7 +221,7 @@ class PartialSink : public MemoryByteSink {
|
|||
|
||||
private:
|
||||
size_t ignore_start_;
|
||||
char discard_[4096];
|
||||
char discard_[BLOCK_SZ];
|
||||
};
|
||||
|
||||
ssize_t CompressedSnapshotReader::ReadBlock(uint64_t chunk, IByteSink* sink, size_t start_offset,
|
||||
|
@ -277,6 +277,29 @@ ssize_t CompressedSnapshotReader::ReadBlock(uint64_t chunk, IByteSink* sink, siz
|
|||
errno = EIO;
|
||||
return -1;
|
||||
}
|
||||
} else if (op->type == kCowXorOp) {
|
||||
borrowed_fd fd = GetSourceFd();
|
||||
if (fd < 0) {
|
||||
// GetSourceFd sets errno.
|
||||
return -1;
|
||||
}
|
||||
|
||||
off64_t offset = op->source + start_offset;
|
||||
char data[BLOCK_SZ];
|
||||
if (!android::base::ReadFullyAtOffset(fd, &data, bytes_to_read, offset)) {
|
||||
PLOG(ERROR) << "read " << *source_device_;
|
||||
// ReadFullyAtOffset sets errno.
|
||||
return -1;
|
||||
}
|
||||
PartialSink partial_sink(buffer, bytes_to_read, start_offset);
|
||||
if (!cow_->ReadData(*op, &partial_sink)) {
|
||||
LOG(ERROR) << "CompressedSnapshotReader failed to read xor op";
|
||||
errno = EIO;
|
||||
return -1;
|
||||
}
|
||||
for (size_t i = 0; i < bytes_to_read; i++) {
|
||||
((char*)buffer)[i] ^= data[i];
|
||||
}
|
||||
} else {
|
||||
LOG(ERROR) << "CompressedSnapshotReader unknown op type: " << uint32_t(op->type);
|
||||
errno = EINVAL;
|
||||
|
|
|
@ -63,7 +63,9 @@ class OfflineSnapshotTest : public ::testing::Test {
|
|||
|
||||
void WriteCow(ISnapshotWriter* writer) {
|
||||
std::string new_block = MakeNewBlockString();
|
||||
std::string xor_block = MakeXorBlockString();
|
||||
|
||||
ASSERT_TRUE(writer->AddXorBlocks(1, xor_block.data(), xor_block.size(), 0, kBlockSize / 2));
|
||||
ASSERT_TRUE(writer->AddCopy(3, 0));
|
||||
ASSERT_TRUE(writer->AddRawBlocks(5, new_block.data(), new_block.size()));
|
||||
ASSERT_TRUE(writer->AddZeroBlocks(7, 2));
|
||||
|
@ -75,7 +77,7 @@ class OfflineSnapshotTest : public ::testing::Test {
|
|||
ASSERT_NE(reader, nullptr);
|
||||
|
||||
// Test that unchanged blocks are not modified.
|
||||
std::unordered_set<size_t> changed_blocks = {3, 5, 7, 8};
|
||||
std::unordered_set<size_t> changed_blocks = {1, 3, 5, 7, 8};
|
||||
for (size_t i = 0; i < kBlockCount; i++) {
|
||||
if (changed_blocks.count(i)) {
|
||||
continue;
|
||||
|
@ -88,6 +90,17 @@ class OfflineSnapshotTest : public ::testing::Test {
|
|||
}
|
||||
|
||||
// Test that we can read back our modified blocks.
|
||||
std::string data(kBlockSize, 0);
|
||||
std::string offsetblock = base_blocks_[0].substr(kBlockSize / 2, kBlockSize / 2) +
|
||||
base_blocks_[1].substr(0, kBlockSize / 2);
|
||||
ASSERT_EQ(offsetblock.size(), kBlockSize);
|
||||
ASSERT_EQ(reader->Seek(1 * kBlockSize, SEEK_SET), 1 * kBlockSize);
|
||||
ASSERT_EQ(reader->Read(data.data(), data.size()), kBlockSize);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
data[i] = (char)~(data[i]);
|
||||
}
|
||||
ASSERT_EQ(data, offsetblock);
|
||||
|
||||
std::string block(kBlockSize, 0);
|
||||
ASSERT_EQ(reader->Seek(3 * kBlockSize, SEEK_SET), 3 * kBlockSize);
|
||||
ASSERT_EQ(reader->Read(block.data(), block.size()), kBlockSize);
|
||||
|
@ -141,6 +154,12 @@ class OfflineSnapshotTest : public ::testing::Test {
|
|||
return new_block;
|
||||
}
|
||||
|
||||
std::string MakeXorBlockString() {
|
||||
std::string data(100, -1);
|
||||
data.resize(kBlockSize, 0);
|
||||
return data;
|
||||
}
|
||||
|
||||
std::unique_ptr<TemporaryFile> base_;
|
||||
std::unique_ptr<TemporaryFile> cow_;
|
||||
std::vector<std::string> base_blocks_;
|
||||
|
|
|
@ -259,7 +259,7 @@ void CowSnapuserdTest::StartSnapuserdDaemon() {
|
|||
void CowSnapuserdTest::CreateBaseDevice() {
|
||||
unique_fd rnd_fd;
|
||||
|
||||
total_base_size_ = (size_ * 4);
|
||||
total_base_size_ = (size_ * 5);
|
||||
base_fd_ = CreateTempFile("base_device", total_base_size_);
|
||||
ASSERT_GE(base_fd_, 0);
|
||||
|
||||
|
@ -304,6 +304,11 @@ void CowSnapuserdTest::ReadSnapshotDeviceAndValidate() {
|
|||
offset += size_;
|
||||
ASSERT_EQ(ReadFullyAtOffset(snapshot_fd, snapuserd_buffer.get(), size_, offset), true);
|
||||
ASSERT_EQ(memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + (size_ * 3), size_), 0);
|
||||
|
||||
// XOR
|
||||
offset += size_;
|
||||
ASSERT_EQ(ReadFullyAtOffset(snapshot_fd, snapuserd_buffer.get(), size_, offset), true);
|
||||
ASSERT_EQ(memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + (size_ * 4), size_), 0);
|
||||
}
|
||||
|
||||
void CowSnapuserdTest::CreateCowDeviceWithCopyOverlap_2() {
|
||||
|
@ -428,9 +433,10 @@ void CowSnapuserdTest::CreateCowDeviceOrderedOpsInverted() {
|
|||
ASSERT_TRUE(writer.Initialize(cow_system_->fd));
|
||||
|
||||
size_t num_blocks = size_ / options.block_size;
|
||||
size_t blk_end_copy = num_blocks * 2;
|
||||
size_t blk_end_copy = num_blocks * 3;
|
||||
size_t source_blk = num_blocks - 1;
|
||||
size_t blk_src_copy = blk_end_copy - 1;
|
||||
uint16_t xor_offset = 5;
|
||||
|
||||
size_t x = num_blocks;
|
||||
while (1) {
|
||||
|
@ -443,6 +449,11 @@ void CowSnapuserdTest::CreateCowDeviceOrderedOpsInverted() {
|
|||
blk_src_copy -= 1;
|
||||
}
|
||||
|
||||
for (size_t i = num_blocks; i > 0; i--) {
|
||||
ASSERT_TRUE(writer.AddXorBlocks(num_blocks + i - 1,
|
||||
&random_buffer_1_.get()[options.block_size * (i - 1)],
|
||||
options.block_size, 2 * num_blocks + i - 1, xor_offset));
|
||||
}
|
||||
// Flush operations
|
||||
ASSERT_TRUE(writer.Finalize());
|
||||
// Construct the buffer required for validation
|
||||
|
@ -451,7 +462,11 @@ void CowSnapuserdTest::CreateCowDeviceOrderedOpsInverted() {
|
|||
ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, orig_buffer_.get(), total_base_size_, 0),
|
||||
true);
|
||||
// Merged Buffer
|
||||
memmove(orig_buffer_.get(), (char*)orig_buffer_.get() + size_, size_);
|
||||
memmove(orig_buffer_.get(), (char*)orig_buffer_.get() + 2 * size_, size_);
|
||||
memmove(orig_buffer_.get() + size_, (char*)orig_buffer_.get() + 2 * size_ + xor_offset, size_);
|
||||
for (int i = 0; i < size_; i++) {
|
||||
orig_buffer_.get()[size_ + i] ^= random_buffer_1_.get()[i];
|
||||
}
|
||||
}
|
||||
|
||||
void CowSnapuserdTest::CreateCowDeviceOrderedOps() {
|
||||
|
@ -473,6 +488,7 @@ void CowSnapuserdTest::CreateCowDeviceOrderedOps() {
|
|||
|
||||
offset += 1_MiB;
|
||||
}
|
||||
memset(random_buffer_1_.get(), 0, size_);
|
||||
|
||||
CowOptions options;
|
||||
options.compression = "gz";
|
||||
|
@ -483,7 +499,8 @@ void CowSnapuserdTest::CreateCowDeviceOrderedOps() {
|
|||
size_t num_blocks = size_ / options.block_size;
|
||||
size_t x = num_blocks;
|
||||
size_t source_blk = 0;
|
||||
size_t blk_src_copy = num_blocks;
|
||||
size_t blk_src_copy = 2 * num_blocks;
|
||||
uint16_t xor_offset = 5;
|
||||
|
||||
while (1) {
|
||||
ASSERT_TRUE(writer.AddCopy(source_blk, blk_src_copy));
|
||||
|
@ -496,6 +513,8 @@ void CowSnapuserdTest::CreateCowDeviceOrderedOps() {
|
|||
blk_src_copy += 1;
|
||||
}
|
||||
|
||||
ASSERT_TRUE(writer.AddXorBlocks(num_blocks, random_buffer_1_.get(), size_, 2 * num_blocks,
|
||||
xor_offset));
|
||||
// Flush operations
|
||||
ASSERT_TRUE(writer.Finalize());
|
||||
// Construct the buffer required for validation
|
||||
|
@ -504,7 +523,11 @@ void CowSnapuserdTest::CreateCowDeviceOrderedOps() {
|
|||
ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, orig_buffer_.get(), total_base_size_, 0),
|
||||
true);
|
||||
// Merged Buffer
|
||||
memmove(orig_buffer_.get(), (char*)orig_buffer_.get() + size_, size_);
|
||||
memmove(orig_buffer_.get(), (char*)orig_buffer_.get() + 2 * size_, size_);
|
||||
memmove(orig_buffer_.get() + size_, (char*)orig_buffer_.get() + 2 * size_ + xor_offset, size_);
|
||||
for (int i = 0; i < size_; i++) {
|
||||
orig_buffer_.get()[size_ + i] ^= random_buffer_1_.get()[i];
|
||||
}
|
||||
}
|
||||
|
||||
void CowSnapuserdTest::CreateCowDevice() {
|
||||
|
@ -538,6 +561,17 @@ void CowSnapuserdTest::CreateCowDevice() {
|
|||
size_t source_blk = num_blocks - 1;
|
||||
size_t blk_src_copy = blk_end_copy - 1;
|
||||
|
||||
uint32_t sequence[num_blocks * 2];
|
||||
// Sequence for Copy ops
|
||||
for (int i = 0; i < num_blocks; i++) {
|
||||
sequence[i] = num_blocks - 1 - i;
|
||||
}
|
||||
// Sequence for Xor ops
|
||||
for (int i = 0; i < num_blocks; i++) {
|
||||
sequence[num_blocks + i] = 5 * num_blocks - 1 - i;
|
||||
}
|
||||
ASSERT_TRUE(writer.AddSequenceData(2 * num_blocks, sequence));
|
||||
|
||||
size_t x = num_blocks;
|
||||
while (1) {
|
||||
ASSERT_TRUE(writer.AddCopy(source_blk, blk_src_copy));
|
||||
|
@ -563,6 +597,11 @@ void CowSnapuserdTest::CreateCowDevice() {
|
|||
|
||||
ASSERT_TRUE(writer.AddRawBlocks(blk_random2_replace_start, random_buffer_1_.get(), size_));
|
||||
|
||||
size_t blk_xor_start = blk_random2_replace_start + num_blocks;
|
||||
size_t xor_offset = BLOCK_SZ / 2;
|
||||
ASSERT_TRUE(writer.AddXorBlocks(blk_xor_start, random_buffer_1_.get(), size_, num_blocks,
|
||||
xor_offset));
|
||||
|
||||
// Flush operations
|
||||
ASSERT_TRUE(writer.Finalize());
|
||||
// Construct the buffer required for validation
|
||||
|
@ -572,6 +611,13 @@ void CowSnapuserdTest::CreateCowDevice() {
|
|||
memcpy((char*)orig_buffer_.get() + size_, random_buffer_1_.get(), size_);
|
||||
memcpy((char*)orig_buffer_.get() + (size_ * 2), (void*)zero_buffer.c_str(), size_);
|
||||
memcpy((char*)orig_buffer_.get() + (size_ * 3), random_buffer_1_.get(), size_);
|
||||
ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, &orig_buffer_.get()[size_ * 4], size_,
|
||||
size_ + xor_offset),
|
||||
true);
|
||||
for (int i = 0; i < size_; i++) {
|
||||
orig_buffer_.get()[(size_ * 4) + i] =
|
||||
(uint8_t)(orig_buffer_.get()[(size_ * 4) + i] ^ random_buffer_1_.get()[i]);
|
||||
}
|
||||
}
|
||||
|
||||
void CowSnapuserdTest::InitCowDevice() {
|
||||
|
|
|
@ -338,7 +338,7 @@ bool Snapuserd::ReadMetadata() {
|
|||
CowHeader header;
|
||||
CowOptions options;
|
||||
bool metadata_found = false;
|
||||
int replace_ops = 0, zero_ops = 0, copy_ops = 0;
|
||||
int replace_ops = 0, zero_ops = 0, copy_ops = 0, xor_ops = 0;
|
||||
|
||||
SNAP_LOG(DEBUG) << "ReadMetadata: Parsing cow file";
|
||||
|
||||
|
@ -443,12 +443,12 @@ bool Snapuserd::ReadMetadata() {
|
|||
std::vector<const CowOperation*> vec;
|
||||
std::set<uint64_t> dest_blocks;
|
||||
std::set<uint64_t> source_blocks;
|
||||
size_t pending_copy_ops = exceptions_per_area_ - num_ops;
|
||||
uint64_t total_copy_ops = reader_->get_num_ordered_ops_to_merge();
|
||||
size_t pending_ordered_ops = exceptions_per_area_ - num_ops;
|
||||
uint64_t total_ordered_ops = reader_->get_num_ordered_ops_to_merge();
|
||||
|
||||
SNAP_LOG(DEBUG) << " Processing copy-ops at Area: " << vec_.size()
|
||||
<< " Number of replace/zero ops completed in this area: " << num_ops
|
||||
<< " Pending copy ops for this area: " << pending_copy_ops;
|
||||
<< " Pending copy ops for this area: " << pending_ordered_ops;
|
||||
|
||||
while (!cowop_rm_iter->Done()) {
|
||||
do {
|
||||
|
@ -501,24 +501,34 @@ bool Snapuserd::ReadMetadata() {
|
|||
// the merge of operations are done based on the ops present
|
||||
// in the file.
|
||||
//===========================================================
|
||||
uint64_t block_source = cow_op->source;
|
||||
uint64_t block_offset = 0;
|
||||
if (cow_op->type == kCowXorOp) {
|
||||
block_source /= BLOCK_SZ;
|
||||
block_offset = cow_op->source % BLOCK_SZ;
|
||||
}
|
||||
if (prev_id.has_value()) {
|
||||
if (dest_blocks.count(cow_op->new_block) || source_blocks.count(cow_op->source)) {
|
||||
if (dest_blocks.count(cow_op->new_block) || source_blocks.count(block_source) ||
|
||||
(block_offset > 0 && source_blocks.count(block_source + 1))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
metadata_found = true;
|
||||
pending_copy_ops -= 1;
|
||||
pending_ordered_ops -= 1;
|
||||
vec.push_back(cow_op);
|
||||
dest_blocks.insert(cow_op->source);
|
||||
dest_blocks.insert(block_source);
|
||||
if (block_offset > 0) {
|
||||
dest_blocks.insert(block_source + 1);
|
||||
}
|
||||
source_blocks.insert(cow_op->new_block);
|
||||
prev_id = cow_op->new_block;
|
||||
cowop_rm_iter->Next();
|
||||
} while (!cowop_rm_iter->Done() && pending_copy_ops);
|
||||
} while (!cowop_rm_iter->Done() && pending_ordered_ops);
|
||||
|
||||
data_chunk_id = GetNextAllocatableChunkId(data_chunk_id);
|
||||
SNAP_LOG(DEBUG) << "Batch Merge copy-ops of size: " << vec.size()
|
||||
SNAP_LOG(DEBUG) << "Batch Merge copy-ops/xor-ops of size: " << vec.size()
|
||||
<< " Area: " << vec_.size() << " Area offset: " << offset
|
||||
<< " Pending-copy-ops in this area: " << pending_copy_ops;
|
||||
<< " Pending-ordered-ops in this area: " << pending_ordered_ops;
|
||||
|
||||
for (size_t i = 0; i < vec.size(); i++) {
|
||||
struct disk_exception* de =
|
||||
|
@ -532,13 +542,18 @@ bool Snapuserd::ReadMetadata() {
|
|||
chunk_vec_.push_back(std::make_pair(ChunkToSector(data_chunk_id), cow_op));
|
||||
offset += sizeof(struct disk_exception);
|
||||
num_ops += 1;
|
||||
copy_ops++;
|
||||
if (cow_op->type == kCowCopyOp) {
|
||||
copy_ops++;
|
||||
} else { // it->second->type == kCowXorOp
|
||||
xor_ops++;
|
||||
}
|
||||
|
||||
if (read_ahead_feature_) {
|
||||
read_ahead_ops_.push_back(cow_op);
|
||||
}
|
||||
|
||||
SNAP_LOG(DEBUG) << num_ops << ":"
|
||||
<< " Copy-op: "
|
||||
<< " Ordered-op: "
|
||||
<< " Old-chunk: " << de->old_chunk << " New-chunk: " << de->new_chunk;
|
||||
|
||||
if (num_ops == exceptions_per_area_) {
|
||||
|
@ -558,22 +573,22 @@ bool Snapuserd::ReadMetadata() {
|
|||
SNAP_LOG(DEBUG) << "ReadMetadata() completed; Number of Areas: " << vec_.size();
|
||||
}
|
||||
|
||||
if (!(pending_copy_ops == 0)) {
|
||||
SNAP_LOG(ERROR)
|
||||
<< "Invalid pending_copy_ops: expected: 0 found: " << pending_copy_ops;
|
||||
if (!(pending_ordered_ops == 0)) {
|
||||
SNAP_LOG(ERROR) << "Invalid pending_ordered_ops: expected: 0 found: "
|
||||
<< pending_ordered_ops;
|
||||
return false;
|
||||
}
|
||||
pending_copy_ops = exceptions_per_area_;
|
||||
pending_ordered_ops = exceptions_per_area_;
|
||||
}
|
||||
|
||||
data_chunk_id = GetNextAllocatableChunkId(data_chunk_id);
|
||||
total_copy_ops -= 1;
|
||||
total_ordered_ops -= 1;
|
||||
/*
|
||||
* Split the number of ops based on the size of read-ahead buffer
|
||||
* region. We need to ensure that kernel doesn't issue IO on blocks
|
||||
* which are not read by the read-ahead thread.
|
||||
*/
|
||||
if (read_ahead_feature_ && (total_copy_ops % num_ra_ops_per_iter == 0)) {
|
||||
if (read_ahead_feature_ && (total_ordered_ops % num_ra_ops_per_iter == 0)) {
|
||||
data_chunk_id = GetNextAllocatableChunkId(data_chunk_id);
|
||||
}
|
||||
}
|
||||
|
@ -602,8 +617,8 @@ bool Snapuserd::ReadMetadata() {
|
|||
SNAP_LOG(INFO) << "ReadMetadata completed. Final-chunk-id: " << data_chunk_id
|
||||
<< " Num Sector: " << ChunkToSector(data_chunk_id)
|
||||
<< " Replace-ops: " << replace_ops << " Zero-ops: " << zero_ops
|
||||
<< " Copy-ops: " << copy_ops << " Areas: " << vec_.size()
|
||||
<< " Num-ops-merged: " << header.num_merge_ops
|
||||
<< " Copy-ops: " << copy_ops << " Xor-ops: " << xor_ops
|
||||
<< " Areas: " << vec_.size() << " Num-ops-merged: " << header.num_merge_ops
|
||||
<< " Total-data-ops: " << reader_->get_num_total_data_ops();
|
||||
|
||||
// Total number of sectors required for creating dm-user device
|
||||
|
|
|
@ -143,7 +143,7 @@ class ReadAheadThread {
|
|||
}
|
||||
|
||||
bool ReadAheadIOStart();
|
||||
void PrepareReadAhead(uint64_t* source_block, int* pending_ops, std::vector<uint64_t>& blocks);
|
||||
void PrepareReadAhead(uint64_t* source_offset, int* pending_ops, std::vector<uint64_t>& blocks);
|
||||
bool ReconstructDataFromCow();
|
||||
void CheckOverlap(const CowOperation* cow_op);
|
||||
|
||||
|
@ -201,7 +201,9 @@ class WorkerThread {
|
|||
// Processing COW operations
|
||||
bool ProcessCowOp(const CowOperation* cow_op);
|
||||
bool ProcessReplaceOp(const CowOperation* cow_op);
|
||||
// Handles Copy and Xor
|
||||
bool ProcessCopyOp(const CowOperation* cow_op);
|
||||
bool ProcessXorOp(const CowOperation* cow_op);
|
||||
bool ProcessZeroOp();
|
||||
|
||||
bool ReadFromBaseDevice(const CowOperation* cow_op);
|
||||
|
@ -220,6 +222,7 @@ class WorkerThread {
|
|||
|
||||
std::unique_ptr<CowReader> reader_;
|
||||
BufferSink bufsink_;
|
||||
XorSink xorsink_;
|
||||
|
||||
std::string cow_device_;
|
||||
std::string backing_store_device_;
|
||||
|
|
|
@ -172,23 +172,36 @@ ReadAheadThread::ReadAheadThread(const std::string& cow_device, const std::strin
|
|||
}
|
||||
|
||||
void ReadAheadThread::CheckOverlap(const CowOperation* cow_op) {
|
||||
if (dest_blocks_.count(cow_op->new_block) || source_blocks_.count(cow_op->source)) {
|
||||
uint64_t source_block = cow_op->source;
|
||||
uint64_t source_offset = 0;
|
||||
if (cow_op->type == kCowXorOp) {
|
||||
source_block /= BLOCK_SZ;
|
||||
source_offset = cow_op->source % BLOCK_SZ;
|
||||
}
|
||||
if (dest_blocks_.count(cow_op->new_block) || source_blocks_.count(source_block) ||
|
||||
(source_offset > 0 && source_blocks_.count(source_block + 1))) {
|
||||
overlap_ = true;
|
||||
}
|
||||
|
||||
dest_blocks_.insert(cow_op->source);
|
||||
dest_blocks_.insert(source_block);
|
||||
if (source_offset > 0) {
|
||||
dest_blocks_.insert(source_block + 1);
|
||||
}
|
||||
source_blocks_.insert(cow_op->new_block);
|
||||
}
|
||||
|
||||
void ReadAheadThread::PrepareReadAhead(uint64_t* source_block, int* pending_ops,
|
||||
void ReadAheadThread::PrepareReadAhead(uint64_t* source_offset, int* pending_ops,
|
||||
std::vector<uint64_t>& blocks) {
|
||||
int num_ops = *pending_ops;
|
||||
int nr_consecutive = 0;
|
||||
|
||||
if (!RAIterDone() && num_ops) {
|
||||
// Get the first block
|
||||
// Get the first block with offset
|
||||
const CowOperation* cow_op = GetRAOpIter();
|
||||
*source_block = cow_op->source;
|
||||
*source_offset = cow_op->source;
|
||||
if (cow_op->type == kCowCopyOp) {
|
||||
*source_offset *= BLOCK_SZ;
|
||||
}
|
||||
RAIterNext();
|
||||
num_ops -= 1;
|
||||
nr_consecutive = 1;
|
||||
|
@ -203,7 +216,11 @@ void ReadAheadThread::PrepareReadAhead(uint64_t* source_block, int* pending_ops,
|
|||
*/
|
||||
while (!RAIterDone() && num_ops) {
|
||||
const CowOperation* op = GetRAOpIter();
|
||||
if (op->source != (*source_block - nr_consecutive)) {
|
||||
uint64_t next_offset = op->source;
|
||||
if (cow_op->type == kCowCopyOp) {
|
||||
next_offset *= BLOCK_SZ;
|
||||
}
|
||||
if (next_offset != (*source_offset - nr_consecutive * BLOCK_SZ)) {
|
||||
break;
|
||||
}
|
||||
nr_consecutive += 1;
|
||||
|
@ -312,10 +329,10 @@ bool ReadAheadThread::ReadAheadIOStart() {
|
|||
source_blocks_.clear();
|
||||
|
||||
while (true) {
|
||||
uint64_t source_block;
|
||||
uint64_t source_offset;
|
||||
int linear_blocks;
|
||||
|
||||
PrepareReadAhead(&source_block, &num_ops, blocks);
|
||||
PrepareReadAhead(&source_offset, &num_ops, blocks);
|
||||
linear_blocks = blocks.size();
|
||||
if (linear_blocks == 0) {
|
||||
// No more blocks to read
|
||||
|
@ -324,7 +341,7 @@ bool ReadAheadThread::ReadAheadIOStart() {
|
|||
}
|
||||
|
||||
// Get the first block in the consecutive set of blocks
|
||||
source_block = source_block + 1 - linear_blocks;
|
||||
source_offset = source_offset - (linear_blocks - 1) * BLOCK_SZ;
|
||||
size_t io_size = (linear_blocks * BLOCK_SZ);
|
||||
num_ops -= linear_blocks;
|
||||
total_blocks_merged += linear_blocks;
|
||||
|
@ -358,10 +375,12 @@ bool ReadAheadThread::ReadAheadIOStart() {
|
|||
// Read from the base device consecutive set of blocks in one shot
|
||||
if (!android::base::ReadFullyAtOffset(backing_store_fd_,
|
||||
(char*)read_ahead_buffer_ + buffer_offset, io_size,
|
||||
source_block * BLOCK_SZ)) {
|
||||
SNAP_PLOG(ERROR) << "Copy-op failed. Read from backing store: " << backing_store_device_
|
||||
<< "at block :" << source_block << " buffer_offset : " << buffer_offset
|
||||
<< " io_size : " << io_size << " buf-addr : " << read_ahead_buffer_;
|
||||
source_offset)) {
|
||||
SNAP_PLOG(ERROR) << "Ordered-op failed. Read from backing store: "
|
||||
<< backing_store_device_ << "at block :" << source_offset / BLOCK_SZ
|
||||
<< " offset :" << source_offset % BLOCK_SZ
|
||||
<< " buffer_offset : " << buffer_offset << " io_size : " << io_size
|
||||
<< " buf-addr : " << read_ahead_buffer_;
|
||||
|
||||
snapuserd_->ReadAheadIOFailed();
|
||||
return false;
|
||||
|
|
|
@ -183,10 +183,19 @@ bool WorkerThread::ReadFromBaseDevice(const CowOperation* cow_op) {
|
|||
}
|
||||
SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block
|
||||
<< " Source: " << cow_op->source;
|
||||
if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ,
|
||||
cow_op->source * BLOCK_SZ)) {
|
||||
SNAP_PLOG(ERROR) << "Copy-op failed. Read from backing store: " << backing_store_device_
|
||||
<< "at block :" << cow_op->source;
|
||||
uint64_t offset = cow_op->source;
|
||||
if (cow_op->type == kCowCopyOp) {
|
||||
offset *= BLOCK_SZ;
|
||||
}
|
||||
if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ, offset)) {
|
||||
std::string op;
|
||||
if (cow_op->type == kCowCopyOp)
|
||||
op = "Copy-op";
|
||||
else {
|
||||
op = "Xor-op";
|
||||
}
|
||||
SNAP_PLOG(ERROR) << op << " failed. Read from backing store: " << backing_store_device_
|
||||
<< "at block :" << offset / BLOCK_SZ << " offset:" << offset % BLOCK_SZ;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -221,6 +230,23 @@ bool WorkerThread::ProcessCopyOp(const CowOperation* cow_op) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool WorkerThread::ProcessXorOp(const CowOperation* cow_op) {
|
||||
if (!GetReadAheadPopulatedBuffer(cow_op)) {
|
||||
SNAP_LOG(DEBUG) << " GetReadAheadPopulatedBuffer failed..."
|
||||
<< " new_block: " << cow_op->new_block;
|
||||
if (!ReadFromBaseDevice(cow_op)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
xorsink_.Reset();
|
||||
if (!reader_->ReadData(*cow_op, &xorsink_)) {
|
||||
SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block;
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool WorkerThread::ProcessZeroOp() {
|
||||
// Zero out the entire block
|
||||
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
|
||||
|
@ -252,6 +278,10 @@ bool WorkerThread::ProcessCowOp(const CowOperation* cow_op) {
|
|||
return ProcessCopyOp(cow_op);
|
||||
}
|
||||
|
||||
case kCowXorOp: {
|
||||
return ProcessXorOp(cow_op);
|
||||
}
|
||||
|
||||
default: {
|
||||
SNAP_LOG(ERROR) << "Unknown operation-type found: " << cow_op->type;
|
||||
}
|
||||
|
@ -503,10 +533,10 @@ loff_t WorkerThread::GetMergeStartOffset(void* merged_buffer, void* unmerged_buf
|
|||
}
|
||||
|
||||
int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffer, loff_t offset,
|
||||
int unmerged_exceptions, bool* copy_op, bool* commit) {
|
||||
int unmerged_exceptions, bool* ordered_op, bool* commit) {
|
||||
int merged_ops_cur_iter = 0;
|
||||
std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
|
||||
*copy_op = false;
|
||||
*ordered_op = false;
|
||||
std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
|
||||
|
||||
// Find the operations which are merged in this cycle.
|
||||
|
@ -544,9 +574,9 @@ int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffe
|
|||
}
|
||||
const CowOperation* cow_op = it->second;
|
||||
|
||||
if (snapuserd_->IsReadAheadFeaturePresent() && cow_op->type == kCowCopyOp) {
|
||||
*copy_op = true;
|
||||
// Every single copy operation has to come from read-ahead
|
||||
if (snapuserd_->IsReadAheadFeaturePresent() && IsOrderedOp(*cow_op)) {
|
||||
*ordered_op = true;
|
||||
// Every single ordered operation has to come from read-ahead
|
||||
// cache.
|
||||
if (read_ahead_buffer_map.find(cow_op->new_block) == read_ahead_buffer_map.end()) {
|
||||
SNAP_LOG(ERROR)
|
||||
|
@ -590,7 +620,7 @@ int WorkerThread::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffe
|
|||
bool WorkerThread::ProcessMergeComplete(chunk_t chunk, void* buffer) {
|
||||
uint32_t stride = exceptions_per_area_ + 1;
|
||||
const std::vector<std::unique_ptr<uint8_t[]>>& vec = snapuserd_->GetMetadataVec();
|
||||
bool copy_op = false;
|
||||
bool ordered_op = false;
|
||||
bool commit = false;
|
||||
|
||||
// ChunkID to vector index
|
||||
|
@ -615,7 +645,7 @@ bool WorkerThread::ProcessMergeComplete(chunk_t chunk, void* buffer) {
|
|||
}
|
||||
|
||||
int merged_ops_cur_iter = GetNumberOfMergedOps(buffer, vec[divresult.quot].get(), offset,
|
||||
unmerged_exceptions, ©_op, &commit);
|
||||
unmerged_exceptions, &ordered_op, &commit);
|
||||
|
||||
// There should be at least one operation merged in this cycle
|
||||
if (!(merged_ops_cur_iter > 0)) {
|
||||
|
@ -623,7 +653,7 @@ bool WorkerThread::ProcessMergeComplete(chunk_t chunk, void* buffer) {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (copy_op) {
|
||||
if (ordered_op) {
|
||||
if (commit) {
|
||||
// Push the flushing logic to read-ahead thread so that merge thread
|
||||
// can make forward progress. Sync will happen in the background
|
||||
|
@ -852,6 +882,7 @@ void WorkerThread::InitializeBufsink() {
|
|||
|
||||
bool WorkerThread::RunThread() {
|
||||
InitializeBufsink();
|
||||
xorsink_.Initialize(&bufsink_, BLOCK_SZ);
|
||||
|
||||
if (!InitializeFds()) {
|
||||
return false;
|
||||
|
|
Loading…
Reference in a new issue