snapuserd: Fallback to synchronous I/O if any errors observed during
async merge. If there are any I/O errors during async merge, we will retry the I/O in synchronous I/O path. For this to happen, we have to reset the iterator so that we replay the blocks which were partially completed during async merge. Furthermore, we will disable the async merge and continue to do the I/O in synchronous path. Additionally, cut down the queue depth to 8 so that it will decerease the number of the async offload. We don't want to have a big queue depth with async offload. Bug: 220991038 Test: Instrument the code to fail the Async I/O's randomly and make sure merge is completed. Instrumentation was done both on readahead and merge code path. Signed-off-by: Akilesh Kailash <akailash@google.com> Change-Id: I0db6d0f46054ca5b8423201a598c726b2c3d21ac
This commit is contained in:
parent
7a7c8e4607
commit
325e2acbb9
7 changed files with 247 additions and 111 deletions
|
@ -550,6 +550,9 @@ class CowOpIter final : public ICowOpIter {
|
|||
const CowOperation& Get() override;
|
||||
void Next() override;
|
||||
|
||||
void Prev() override;
|
||||
bool RDone() override;
|
||||
|
||||
private:
|
||||
std::shared_ptr<std::vector<CowOperation>> ops_;
|
||||
std::vector<CowOperation>::iterator op_iter_;
|
||||
|
@ -560,6 +563,15 @@ CowOpIter::CowOpIter(std::shared_ptr<std::vector<CowOperation>>& ops) {
|
|||
op_iter_ = ops_->begin();
|
||||
}
|
||||
|
||||
bool CowOpIter::RDone() {
|
||||
return op_iter_ == ops_->begin();
|
||||
}
|
||||
|
||||
void CowOpIter::Prev() {
|
||||
CHECK(!RDone());
|
||||
op_iter_--;
|
||||
}
|
||||
|
||||
bool CowOpIter::Done() {
|
||||
return op_iter_ == ops_->end();
|
||||
}
|
||||
|
@ -585,6 +597,9 @@ class CowRevMergeOpIter final : public ICowOpIter {
|
|||
const CowOperation& Get() override;
|
||||
void Next() override;
|
||||
|
||||
void Prev() override;
|
||||
bool RDone() override;
|
||||
|
||||
private:
|
||||
std::shared_ptr<std::vector<CowOperation>> ops_;
|
||||
std::shared_ptr<std::vector<uint32_t>> merge_op_blocks_;
|
||||
|
@ -603,6 +618,9 @@ class CowMergeOpIter final : public ICowOpIter {
|
|||
const CowOperation& Get() override;
|
||||
void Next() override;
|
||||
|
||||
void Prev() override;
|
||||
bool RDone() override;
|
||||
|
||||
private:
|
||||
std::shared_ptr<std::vector<CowOperation>> ops_;
|
||||
std::shared_ptr<std::vector<uint32_t>> merge_op_blocks_;
|
||||
|
@ -623,6 +641,15 @@ CowMergeOpIter::CowMergeOpIter(std::shared_ptr<std::vector<CowOperation>> ops,
|
|||
block_iter_ = merge_op_blocks->begin() + start;
|
||||
}
|
||||
|
||||
bool CowMergeOpIter::RDone() {
|
||||
return block_iter_ == merge_op_blocks_->begin();
|
||||
}
|
||||
|
||||
void CowMergeOpIter::Prev() {
|
||||
CHECK(!RDone());
|
||||
block_iter_--;
|
||||
}
|
||||
|
||||
bool CowMergeOpIter::Done() {
|
||||
return block_iter_ == merge_op_blocks_->end();
|
||||
}
|
||||
|
@ -649,6 +676,15 @@ CowRevMergeOpIter::CowRevMergeOpIter(std::shared_ptr<std::vector<CowOperation>>
|
|||
block_riter_ = merge_op_blocks->rbegin();
|
||||
}
|
||||
|
||||
bool CowRevMergeOpIter::RDone() {
|
||||
return block_riter_ == merge_op_blocks_->rbegin();
|
||||
}
|
||||
|
||||
void CowRevMergeOpIter::Prev() {
|
||||
CHECK(!RDone());
|
||||
block_riter_--;
|
||||
}
|
||||
|
||||
bool CowRevMergeOpIter::Done() {
|
||||
return block_riter_ == merge_op_blocks_->rend() - start_;
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ class ICowOpIter {
|
|||
public:
|
||||
virtual ~ICowOpIter() {}
|
||||
|
||||
// True if there are more items to read, false otherwise.
|
||||
// True if there are no more items to read forward, false otherwise.
|
||||
virtual bool Done() = 0;
|
||||
|
||||
// Read the current operation.
|
||||
|
@ -100,6 +100,12 @@ class ICowOpIter {
|
|||
|
||||
// Advance to the next item.
|
||||
virtual void Next() = 0;
|
||||
|
||||
// Advance to the previous item.
|
||||
virtual void Prev() = 0;
|
||||
|
||||
// True if there are no more items to read backwards, false otherwise
|
||||
virtual bool RDone() = 0;
|
||||
};
|
||||
|
||||
class CowReader final : public ICowReader {
|
||||
|
|
|
@ -492,22 +492,17 @@ void SnapshotHandler::ReadBlocks(const std::string partition_name,
|
|||
return;
|
||||
}
|
||||
|
||||
if (IsIouringSupported()) {
|
||||
std::async(std::launch::async, &SnapshotHandler::ReadBlocksAsync, this, dm_block_device,
|
||||
partition_name, dev_sz);
|
||||
} else {
|
||||
int num_threads = 2;
|
||||
size_t num_blocks = dev_sz >> BLOCK_SHIFT;
|
||||
size_t num_blocks_per_thread = num_blocks / num_threads;
|
||||
size_t read_sz_per_thread = num_blocks_per_thread << BLOCK_SHIFT;
|
||||
off_t offset = 0;
|
||||
int num_threads = 2;
|
||||
size_t num_blocks = dev_sz >> BLOCK_SHIFT;
|
||||
size_t num_blocks_per_thread = num_blocks / num_threads;
|
||||
size_t read_sz_per_thread = num_blocks_per_thread << BLOCK_SHIFT;
|
||||
off_t offset = 0;
|
||||
|
||||
for (int i = 0; i < num_threads; i++) {
|
||||
std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this,
|
||||
dm_block_device, partition_name, offset, read_sz_per_thread);
|
||||
for (int i = 0; i < num_threads; i++) {
|
||||
std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this, dm_block_device,
|
||||
partition_name, offset, read_sz_per_thread);
|
||||
|
||||
offset += read_sz_per_thread;
|
||||
}
|
||||
offset += read_sz_per_thread;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -694,10 +689,8 @@ bool SnapshotHandler::IsIouringSupported() {
|
|||
// During selinux init transition, libsnapshot will propagate the
|
||||
// status of io_uring enablement. As properties are not initialized,
|
||||
// we cannot query system property.
|
||||
//
|
||||
// TODO: b/219642530: Intermittent I/O failures observed
|
||||
if (is_io_uring_enabled_) {
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
// Finally check the system property
|
||||
|
|
|
@ -99,6 +99,7 @@ class ReadAhead {
|
|||
void InitializeRAIter();
|
||||
bool RAIterDone();
|
||||
void RAIterNext();
|
||||
void RAResetIter(uint64_t num_blocks);
|
||||
const CowOperation* GetRAOpIter();
|
||||
|
||||
void InitializeBuffer();
|
||||
|
@ -151,12 +152,16 @@ class ReadAhead {
|
|||
std::unique_ptr<uint8_t[]> ra_temp_meta_buffer_;
|
||||
BufferSink bufsink_;
|
||||
|
||||
uint64_t total_ra_blocks_completed_ = 0;
|
||||
bool read_ahead_async_ = false;
|
||||
// Queue depth of 32 seems optimal. We don't want
|
||||
// Queue depth of 8 seems optimal. We don't want
|
||||
// to have a huge depth as it may put more memory pressure
|
||||
// on the kernel worker threads given that we use
|
||||
// IOSQE_ASYNC flag.
|
||||
int queue_depth_ = 32;
|
||||
// IOSQE_ASYNC flag - ASYNC flags can potentially
|
||||
// result in EINTR; Since we don't restart
|
||||
// syscalls and fallback to synchronous I/O, we
|
||||
// don't want huge queue depth
|
||||
int queue_depth_ = 8;
|
||||
std::unique_ptr<struct io_uring> ring_;
|
||||
};
|
||||
|
||||
|
@ -210,11 +215,12 @@ class Worker {
|
|||
|
||||
// Merge related ops
|
||||
bool Merge();
|
||||
bool MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
|
||||
bool MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter);
|
||||
bool MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
|
||||
bool AsyncMerge();
|
||||
bool SyncMerge();
|
||||
bool MergeOrderedOps();
|
||||
bool MergeOrderedOpsAsync();
|
||||
bool MergeReplaceZeroOps();
|
||||
int PrepareMerge(uint64_t* source_offset, int* pending_ops,
|
||||
const std::unique_ptr<ICowOpIter>& cowop_iter,
|
||||
std::vector<const CowOperation*>* replace_zero_vec = nullptr);
|
||||
|
||||
sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
|
||||
|
@ -238,12 +244,18 @@ class Worker {
|
|||
unique_fd base_path_merge_fd_;
|
||||
unique_fd ctrl_fd_;
|
||||
|
||||
std::unique_ptr<ICowOpIter> cowop_iter_;
|
||||
size_t ra_block_index_ = 0;
|
||||
uint64_t blocks_merged_in_group_ = 0;
|
||||
bool merge_async_ = false;
|
||||
// Queue depth of 32 seems optimal. We don't want
|
||||
// Queue depth of 8 seems optimal. We don't want
|
||||
// to have a huge depth as it may put more memory pressure
|
||||
// on the kernel worker threads given that we use
|
||||
// IOSQE_ASYNC flag.
|
||||
int queue_depth_ = 32;
|
||||
// IOSQE_ASYNC flag - ASYNC flags can potentially
|
||||
// result in EINTR; Since we don't restart
|
||||
// syscalls and fallback to synchronous I/O, we
|
||||
// don't want huge queue depth
|
||||
int queue_depth_ = 8;
|
||||
std::unique_ptr<struct io_uring> ring_;
|
||||
|
||||
std::shared_ptr<SnapshotHandler> snapuserd_;
|
||||
|
|
|
@ -24,15 +24,14 @@ using namespace android::dm;
|
|||
using android::base::unique_fd;
|
||||
|
||||
int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
|
||||
const std::unique_ptr<ICowOpIter>& cowop_iter,
|
||||
std::vector<const CowOperation*>* replace_zero_vec) {
|
||||
int num_ops = *pending_ops;
|
||||
int nr_consecutive = 0;
|
||||
bool checkOrderedOp = (replace_zero_vec == nullptr);
|
||||
|
||||
do {
|
||||
if (!cowop_iter->Done() && num_ops) {
|
||||
const CowOperation* cow_op = &cowop_iter->Get();
|
||||
if (!cowop_iter_->Done() && num_ops) {
|
||||
const CowOperation* cow_op = &cowop_iter_->Get();
|
||||
if (checkOrderedOp && !IsOrderedOp(*cow_op)) {
|
||||
break;
|
||||
}
|
||||
|
@ -42,12 +41,12 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
|
|||
replace_zero_vec->push_back(cow_op);
|
||||
}
|
||||
|
||||
cowop_iter->Next();
|
||||
cowop_iter_->Next();
|
||||
num_ops -= 1;
|
||||
nr_consecutive = 1;
|
||||
|
||||
while (!cowop_iter->Done() && num_ops) {
|
||||
const CowOperation* op = &cowop_iter->Get();
|
||||
while (!cowop_iter_->Done() && num_ops) {
|
||||
const CowOperation* op = &cowop_iter_->Get();
|
||||
if (checkOrderedOp && !IsOrderedOp(*op)) {
|
||||
break;
|
||||
}
|
||||
|
@ -63,7 +62,7 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
|
|||
|
||||
nr_consecutive += 1;
|
||||
num_ops -= 1;
|
||||
cowop_iter->Next();
|
||||
cowop_iter_->Next();
|
||||
}
|
||||
}
|
||||
} while (0);
|
||||
|
@ -71,7 +70,7 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
|
|||
return nr_consecutive;
|
||||
}
|
||||
|
||||
bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
|
||||
bool Worker::MergeReplaceZeroOps() {
|
||||
// Flush every 8192 ops. Since all ops are independent and there is no
|
||||
// dependency between COW ops, we will flush the data and the number
|
||||
// of ops merged in COW file for every 8192 ops. If there is a crash,
|
||||
|
@ -84,15 +83,17 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter)
|
|||
int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 32;
|
||||
int num_ops_merged = 0;
|
||||
|
||||
while (!cowop_iter->Done()) {
|
||||
SNAP_LOG(INFO) << "MergeReplaceZeroOps started....";
|
||||
|
||||
while (!cowop_iter_->Done()) {
|
||||
int num_ops = PAYLOAD_BUFFER_SZ / BLOCK_SZ;
|
||||
std::vector<const CowOperation*> replace_zero_vec;
|
||||
uint64_t source_offset;
|
||||
|
||||
int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter, &replace_zero_vec);
|
||||
int linear_blocks = PrepareMerge(&source_offset, &num_ops, &replace_zero_vec);
|
||||
if (linear_blocks == 0) {
|
||||
// Merge complete
|
||||
CHECK(cowop_iter->Done());
|
||||
CHECK(cowop_iter_->Done());
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -117,8 +118,8 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter)
|
|||
size_t io_size = linear_blocks * BLOCK_SZ;
|
||||
|
||||
// Merge - Write the contents back to base device
|
||||
int ret = pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(), io_size,
|
||||
source_offset);
|
||||
int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(),
|
||||
io_size, source_offset));
|
||||
if (ret < 0 || ret != io_size) {
|
||||
SNAP_LOG(ERROR)
|
||||
<< "Merge: ReplaceZeroOps: Failed to write to backing device while merging "
|
||||
|
@ -172,16 +173,15 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter)
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) {
|
||||
bool Worker::MergeOrderedOpsAsync() {
|
||||
void* mapped_addr = snapuserd_->GetMappedAddr();
|
||||
void* read_ahead_buffer =
|
||||
static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
|
||||
size_t block_index = 0;
|
||||
|
||||
SNAP_LOG(INFO) << "MergeOrderedOpsAsync started....";
|
||||
|
||||
while (!cowop_iter->Done()) {
|
||||
const CowOperation* cow_op = &cowop_iter->Get();
|
||||
while (!cowop_iter_->Done()) {
|
||||
const CowOperation* cow_op = &cowop_iter_->Get();
|
||||
if (!IsOrderedOp(*cow_op)) {
|
||||
break;
|
||||
}
|
||||
|
@ -190,11 +190,10 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter)
|
|||
// Wait for RA thread to notify that the merge window
|
||||
// is ready for merging.
|
||||
if (!snapuserd_->WaitForMergeBegin()) {
|
||||
snapuserd_->SetMergeFailed(block_index);
|
||||
return false;
|
||||
}
|
||||
|
||||
snapuserd_->SetMergeInProgress(block_index);
|
||||
snapuserd_->SetMergeInProgress(ra_block_index_);
|
||||
|
||||
loff_t offset = 0;
|
||||
int num_ops = snapuserd_->GetTotalBlocksToMerge();
|
||||
|
@ -202,12 +201,13 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter)
|
|||
int pending_sqe = queue_depth_;
|
||||
int pending_ios_to_submit = 0;
|
||||
bool flush_required = false;
|
||||
blocks_merged_in_group_ = 0;
|
||||
|
||||
SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
|
||||
while (num_ops) {
|
||||
uint64_t source_offset;
|
||||
|
||||
int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter);
|
||||
int linear_blocks = PrepareMerge(&source_offset, &num_ops);
|
||||
|
||||
if (linear_blocks != 0) {
|
||||
size_t io_size = (linear_blocks * BLOCK_SZ);
|
||||
|
@ -216,7 +216,6 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter)
|
|||
struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
|
||||
if (!sqe) {
|
||||
SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
|
||||
snapuserd_->SetMergeFailed(block_index);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -225,10 +224,18 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter)
|
|||
|
||||
offset += io_size;
|
||||
num_ops -= linear_blocks;
|
||||
blocks_merged_in_group_ += linear_blocks;
|
||||
|
||||
pending_sqe -= 1;
|
||||
pending_ios_to_submit += 1;
|
||||
sqe->flags |= IOSQE_ASYNC;
|
||||
// These flags are important - We need to make sure that the
|
||||
// blocks are linked and are written in the same order as
|
||||
// populated. This is because of overlapping block writes.
|
||||
//
|
||||
// If there are no dependency, we can optimize this further by
|
||||
// allowing parallel writes; but for now, just link all the SQ
|
||||
// entries.
|
||||
sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
|
||||
}
|
||||
|
||||
// Ring is full or no more COW ops to be merged in this batch
|
||||
|
@ -256,7 +263,7 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter)
|
|||
pending_sqe -= 1;
|
||||
flush_required = false;
|
||||
pending_ios_to_submit += 1;
|
||||
sqe->flags |= IOSQE_ASYNC;
|
||||
sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
|
||||
}
|
||||
} else {
|
||||
flush_required = true;
|
||||
|
@ -269,35 +276,45 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter)
|
|||
SNAP_PLOG(ERROR)
|
||||
<< "io_uring_submit failed for read-ahead: "
|
||||
<< " io submit: " << ret << " expected: " << pending_ios_to_submit;
|
||||
snapuserd_->SetMergeFailed(block_index);
|
||||
return false;
|
||||
}
|
||||
|
||||
int pending_ios_to_complete = pending_ios_to_submit;
|
||||
pending_ios_to_submit = 0;
|
||||
|
||||
bool status = true;
|
||||
|
||||
// Reap I/O completions
|
||||
while (pending_ios_to_complete) {
|
||||
struct io_uring_cqe* cqe;
|
||||
|
||||
// We need to make sure to reap all the I/O's submitted
|
||||
// even if there are any errors observed.
|
||||
//
|
||||
// io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
|
||||
// these error codes are not truly I/O errors; we can retry them
|
||||
// by re-populating the SQE entries and submitting the I/O
|
||||
// request back. However, we don't do that now; instead we
|
||||
// will fallback to synchronous I/O.
|
||||
ret = io_uring_wait_cqe(ring_.get(), &cqe);
|
||||
if (ret) {
|
||||
SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret;
|
||||
snapuserd_->SetMergeFailed(block_index);
|
||||
return false;
|
||||
SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed: " << ret;
|
||||
status = false;
|
||||
}
|
||||
|
||||
if (cqe->res < 0) {
|
||||
SNAP_LOG(ERROR)
|
||||
<< "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res;
|
||||
snapuserd_->SetMergeFailed(block_index);
|
||||
return false;
|
||||
SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed with res: " << cqe->res;
|
||||
status = false;
|
||||
}
|
||||
|
||||
io_uring_cqe_seen(ring_.get(), cqe);
|
||||
pending_ios_to_complete -= 1;
|
||||
}
|
||||
|
||||
if (!status) {
|
||||
return false;
|
||||
}
|
||||
|
||||
pending_sqe = queue_depth_;
|
||||
}
|
||||
|
||||
|
@ -312,7 +329,6 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter)
|
|||
// Flush the data
|
||||
if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) {
|
||||
SNAP_LOG(ERROR) << " Failed to fsync merged data";
|
||||
snapuserd_->SetMergeFailed(block_index);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -320,35 +336,34 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter)
|
|||
// the merge completion
|
||||
if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
|
||||
SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
|
||||
snapuserd_->SetMergeFailed(block_index);
|
||||
return false;
|
||||
}
|
||||
|
||||
SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
|
||||
|
||||
// Mark the block as merge complete
|
||||
snapuserd_->SetMergeCompleted(block_index);
|
||||
snapuserd_->SetMergeCompleted(ra_block_index_);
|
||||
|
||||
// Notify RA thread that the merge thread is ready to merge the next
|
||||
// window
|
||||
snapuserd_->NotifyRAForMergeReady();
|
||||
|
||||
// Get the next block
|
||||
block_index += 1;
|
||||
ra_block_index_ += 1;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
|
||||
bool Worker::MergeOrderedOps() {
|
||||
void* mapped_addr = snapuserd_->GetMappedAddr();
|
||||
void* read_ahead_buffer =
|
||||
static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
|
||||
size_t block_index = 0;
|
||||
|
||||
SNAP_LOG(INFO) << "MergeOrderedOps started....";
|
||||
|
||||
while (!cowop_iter->Done()) {
|
||||
const CowOperation* cow_op = &cowop_iter->Get();
|
||||
while (!cowop_iter_->Done()) {
|
||||
const CowOperation* cow_op = &cowop_iter_->Get();
|
||||
if (!IsOrderedOp(*cow_op)) {
|
||||
break;
|
||||
}
|
||||
|
@ -357,11 +372,11 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
|
|||
// Wait for RA thread to notify that the merge window
|
||||
// is ready for merging.
|
||||
if (!snapuserd_->WaitForMergeBegin()) {
|
||||
snapuserd_->SetMergeFailed(block_index);
|
||||
snapuserd_->SetMergeFailed(ra_block_index_);
|
||||
return false;
|
||||
}
|
||||
|
||||
snapuserd_->SetMergeInProgress(block_index);
|
||||
snapuserd_->SetMergeInProgress(ra_block_index_);
|
||||
|
||||
loff_t offset = 0;
|
||||
int num_ops = snapuserd_->GetTotalBlocksToMerge();
|
||||
|
@ -369,7 +384,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
|
|||
while (num_ops) {
|
||||
uint64_t source_offset;
|
||||
|
||||
int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter);
|
||||
int linear_blocks = PrepareMerge(&source_offset, &num_ops);
|
||||
if (linear_blocks == 0) {
|
||||
break;
|
||||
}
|
||||
|
@ -378,12 +393,13 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
|
|||
// Write to the base device. Data is already in the RA buffer. Note
|
||||
// that XOR ops is already handled by the RA thread. We just write
|
||||
// the contents out.
|
||||
int ret = pwrite(base_path_merge_fd_.get(), (char*)read_ahead_buffer + offset, io_size,
|
||||
source_offset);
|
||||
int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(),
|
||||
(char*)read_ahead_buffer + offset, io_size,
|
||||
source_offset));
|
||||
if (ret < 0 || ret != io_size) {
|
||||
SNAP_LOG(ERROR) << "Failed to write to backing device while merging "
|
||||
<< " at offset: " << source_offset << " io_size: " << io_size;
|
||||
snapuserd_->SetMergeFailed(block_index);
|
||||
snapuserd_->SetMergeFailed(ra_block_index_);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -397,7 +413,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
|
|||
// Flush the data
|
||||
if (fsync(base_path_merge_fd_.get()) < 0) {
|
||||
SNAP_LOG(ERROR) << " Failed to fsync merged data";
|
||||
snapuserd_->SetMergeFailed(block_index);
|
||||
snapuserd_->SetMergeFailed(ra_block_index_);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -405,47 +421,87 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
|
|||
// the merge completion
|
||||
if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
|
||||
SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
|
||||
snapuserd_->SetMergeFailed(block_index);
|
||||
snapuserd_->SetMergeFailed(ra_block_index_);
|
||||
return false;
|
||||
}
|
||||
|
||||
SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
|
||||
// Mark the block as merge complete
|
||||
snapuserd_->SetMergeCompleted(block_index);
|
||||
snapuserd_->SetMergeCompleted(ra_block_index_);
|
||||
|
||||
// Notify RA thread that the merge thread is ready to merge the next
|
||||
// window
|
||||
snapuserd_->NotifyRAForMergeReady();
|
||||
|
||||
// Get the next block
|
||||
block_index += 1;
|
||||
ra_block_index_ += 1;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Worker::Merge() {
|
||||
std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter();
|
||||
bool Worker::AsyncMerge() {
|
||||
if (!MergeOrderedOpsAsync()) {
|
||||
SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O";
|
||||
// Reset the iter so that we retry the merge
|
||||
while (blocks_merged_in_group_ && !cowop_iter_->RDone()) {
|
||||
cowop_iter_->Prev();
|
||||
blocks_merged_in_group_ -= 1;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed";
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Worker::SyncMerge() {
|
||||
if (!MergeOrderedOps()) {
|
||||
SNAP_LOG(ERROR) << "Merge failed for ordered ops";
|
||||
return false;
|
||||
}
|
||||
|
||||
SNAP_LOG(INFO) << "MergeOrderedOps completed";
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Worker::Merge() {
|
||||
cowop_iter_ = reader_->GetMergeOpIter();
|
||||
|
||||
bool retry = false;
|
||||
bool ordered_ops_merge_status;
|
||||
|
||||
// Start Async Merge
|
||||
if (merge_async_) {
|
||||
if (!MergeOrderedOpsAsync(cowop_iter)) {
|
||||
ordered_ops_merge_status = AsyncMerge();
|
||||
if (!ordered_ops_merge_status) {
|
||||
FinalizeIouring();
|
||||
retry = true;
|
||||
merge_async_ = false;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we need to fallback and retry the merge
|
||||
//
|
||||
// If the device doesn't support async merge, we
|
||||
// will directly enter here (aka devices with 4.x kernels)
|
||||
const bool sync_merge_required = (retry || !merge_async_);
|
||||
|
||||
if (sync_merge_required) {
|
||||
ordered_ops_merge_status = SyncMerge();
|
||||
if (!ordered_ops_merge_status) {
|
||||
// Merge failed. Device will continue to be mounted
|
||||
// off snapshots; merge will be retried during
|
||||
// next reboot
|
||||
SNAP_LOG(ERROR) << "Merge failed for ordered ops";
|
||||
snapuserd_->MergeFailed();
|
||||
return false;
|
||||
}
|
||||
SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed.....";
|
||||
} else {
|
||||
// Start with Copy and Xor ops
|
||||
if (!MergeOrderedOps(cowop_iter)) {
|
||||
SNAP_LOG(ERROR) << "Merge failed for ordered ops";
|
||||
snapuserd_->MergeFailed();
|
||||
return false;
|
||||
}
|
||||
SNAP_LOG(INFO) << "MergeOrderedOps completed.....";
|
||||
}
|
||||
|
||||
// Replace and Zero ops
|
||||
if (!MergeReplaceZeroOps(cowop_iter)) {
|
||||
if (!MergeReplaceZeroOps()) {
|
||||
SNAP_LOG(ERROR) << "Merge failed for replace/zero ops";
|
||||
snapuserd_->MergeFailed();
|
||||
return false;
|
||||
|
@ -461,14 +517,6 @@ bool Worker::InitializeIouring() {
|
|||
return false;
|
||||
}
|
||||
|
||||
{
|
||||
// TODO: b/219642530 - Disable io_uring for merge
|
||||
// until we figure out the cause of intermittent
|
||||
// IO failures.
|
||||
merge_async_ = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
ring_ = std::make_unique<struct io_uring>();
|
||||
|
||||
int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
|
||||
|
@ -514,7 +562,7 @@ bool Worker::RunMergeThread() {
|
|||
CloseFds();
|
||||
reader_->CloseCowFd();
|
||||
|
||||
SNAP_LOG(INFO) << "Merge finish";
|
||||
SNAP_LOG(INFO) << "Snapshot-Merge completed";
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -279,7 +279,6 @@ bool ReadAhead::ReadAheadAsyncIO() {
|
|||
sqe = io_uring_get_sqe(ring_.get());
|
||||
if (!sqe) {
|
||||
SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during read-ahead";
|
||||
snapuserd_->ReadAheadIOFailed();
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -309,7 +308,6 @@ bool ReadAhead::ReadAheadAsyncIO() {
|
|||
if (ret != pending_ios_to_submit) {
|
||||
SNAP_PLOG(ERROR) << "io_uring_submit failed for read-ahead: "
|
||||
<< " io submit: " << ret << " expected: " << pending_ios_to_submit;
|
||||
snapuserd_->ReadAheadIOFailed();
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -321,14 +319,12 @@ bool ReadAhead::ReadAheadAsyncIO() {
|
|||
// Read XOR data from COW file in parallel when I/O's are in-flight
|
||||
if (xor_processing_required && !ReadXorData(block_index, xor_op_index, xor_op_vec)) {
|
||||
SNAP_LOG(ERROR) << "ReadXorData failed";
|
||||
snapuserd_->ReadAheadIOFailed();
|
||||
return false;
|
||||
}
|
||||
|
||||
// Fetch I/O completions
|
||||
if (!ReapIoCompletions(pending_ios_to_complete)) {
|
||||
SNAP_LOG(ERROR) << "ReapIoCompletions failed";
|
||||
snapuserd_->ReadAheadIOFailed();
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -393,26 +389,36 @@ void ReadAhead::UpdateScratchMetadata() {
|
|||
}
|
||||
|
||||
bool ReadAhead::ReapIoCompletions(int pending_ios_to_complete) {
|
||||
bool status = true;
|
||||
|
||||
// Reap I/O completions
|
||||
while (pending_ios_to_complete) {
|
||||
struct io_uring_cqe* cqe;
|
||||
|
||||
// We need to make sure to reap all the I/O's submitted
|
||||
// even if there are any errors observed.
|
||||
//
|
||||
// io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
|
||||
// these error codes are not truly I/O errors; we can retry them
|
||||
// by re-populating the SQE entries and submitting the I/O
|
||||
// request back. However, we don't do that now; instead we
|
||||
// will fallback to synchronous I/O.
|
||||
int ret = io_uring_wait_cqe(ring_.get(), &cqe);
|
||||
if (ret) {
|
||||
SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret;
|
||||
return false;
|
||||
status = false;
|
||||
}
|
||||
|
||||
if (cqe->res < 0) {
|
||||
SNAP_LOG(ERROR) << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res;
|
||||
return false;
|
||||
status = false;
|
||||
}
|
||||
|
||||
io_uring_cqe_seen(ring_.get(), cqe);
|
||||
pending_ios_to_complete -= 1;
|
||||
}
|
||||
|
||||
return true;
|
||||
return status;
|
||||
}
|
||||
|
||||
void ReadAhead::ProcessXorData(size_t& block_xor_index, size_t& xor_index,
|
||||
|
@ -610,18 +616,38 @@ bool ReadAhead::ReadAheadIOStart() {
|
|||
return ReconstructDataFromCow();
|
||||
}
|
||||
|
||||
bool retry = false;
|
||||
bool ra_status;
|
||||
|
||||
// Start Async read-ahead
|
||||
if (read_ahead_async_) {
|
||||
if (!ReadAheadAsyncIO()) {
|
||||
SNAP_LOG(ERROR) << "ReadAheadAsyncIO failed - io_uring processing failure.";
|
||||
return false;
|
||||
ra_status = ReadAheadAsyncIO();
|
||||
if (!ra_status) {
|
||||
SNAP_LOG(ERROR) << "ReadAheadAsyncIO failed - Falling back synchronous I/O";
|
||||
FinalizeIouring();
|
||||
RAResetIter(total_blocks_merged_);
|
||||
retry = true;
|
||||
read_ahead_async_ = false;
|
||||
}
|
||||
} else {
|
||||
if (!ReadAheadSyncIO()) {
|
||||
}
|
||||
|
||||
// Check if we need to fallback and retry the merge
|
||||
//
|
||||
// If the device doesn't support async operations, we
|
||||
// will directly enter here (aka devices with 4.x kernels)
|
||||
|
||||
const bool ra_sync_required = (retry || !read_ahead_async_);
|
||||
|
||||
if (ra_sync_required) {
|
||||
ra_status = ReadAheadSyncIO();
|
||||
if (!ra_status) {
|
||||
SNAP_LOG(ERROR) << "ReadAheadSyncIO failed";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
SNAP_LOG(DEBUG) << "Read-ahead: total_ra_blocks_merged: " << total_ra_blocks_completed_;
|
||||
|
||||
// Wait for the merge to finish for the previous RA window. We shouldn't
|
||||
// be touching the scratch space until merge is complete of previous RA
|
||||
// window. If there is a crash during this time frame, merge should resume
|
||||
|
@ -646,6 +672,7 @@ bool ReadAhead::ReadAheadIOStart() {
|
|||
offset += BLOCK_SZ;
|
||||
}
|
||||
|
||||
total_ra_blocks_completed_ += total_blocks_merged_;
|
||||
snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged_);
|
||||
|
||||
// Flush the data only if we have a overlapping blocks in the region
|
||||
|
@ -763,6 +790,13 @@ void ReadAhead::RAIterNext() {
|
|||
cowop_iter_->Next();
|
||||
}
|
||||
|
||||
void ReadAhead::RAResetIter(uint64_t num_blocks) {
|
||||
while (num_blocks && !cowop_iter_->RDone()) {
|
||||
cowop_iter_->Prev();
|
||||
num_blocks -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
const CowOperation* ReadAhead::GetRAOpIter() {
|
||||
const CowOperation* cow_op = &cowop_iter_->Get();
|
||||
return cow_op;
|
||||
|
|
|
@ -531,6 +531,13 @@ void SnapshotHandler::SetMergeInProgress(size_t ra_index) {
|
|||
{
|
||||
std::unique_lock<std::mutex> lock(blk_state->m_lock);
|
||||
|
||||
// We may have fallback from Async-merge to synchronous merging
|
||||
// on the existing block. There is no need to reset as the
|
||||
// merge is already in progress.
|
||||
if (blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS) {
|
||||
return;
|
||||
}
|
||||
|
||||
CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_PENDING);
|
||||
|
||||
// First set the state to RA_READY so that in-flight I/O will drain
|
||||
|
|
Loading…
Reference in a new issue