snapuserd: Create a MergeWorker class.

Merge threads and read threads share some common state but not much.
Splitting into separate classes will help isolate dm-user specific code.

Bug: 288273605
Test: snapuserd_test
Change-Id: I612374bb0072b1eedf32c30270913dbe907cc6ab
This commit is contained in:
David Anderson 2023-06-23 12:19:57 -07:00
parent 77280caed1
commit c28150f56f
5 changed files with 92 additions and 48 deletions

View file

@ -19,6 +19,7 @@
#include <android-base/logging.h>
#include "snapuserd_core.h"
#include "snapuserd_merge.h"
namespace android {
namespace snapshot {

View file

@ -23,6 +23,8 @@
#include <android-base/scopeguard.h>
#include <android-base/strings.h>
#include "snapuserd_merge.h"
namespace android {
namespace snapshot {
@ -57,8 +59,9 @@ bool SnapshotHandler::InitializeWorkers() {
worker_threads_.push_back(std::move(wt));
}
merge_thread_ = std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
misc_name_, base_path_merge_, GetSharedPtr());
merge_thread_ =
std::make_unique<MergeWorker>(cow_device_, backing_store_device_, control_device_,
misc_name_, base_path_merge_, GetSharedPtr());
read_ahead_thread_ = std::make_unique<ReadAhead>(cow_device_, backing_store_device_, misc_name_,
GetSharedPtr());
@ -316,7 +319,7 @@ bool SnapshotHandler::Start() {
}
std::future<bool> merge_thread =
std::async(std::launch::async, &Worker::RunMergeThread, merge_thread_.get());
std::async(std::launch::async, &MergeWorker::Run, merge_thread_.get());
// Now that the worker threads are up, scan the partitions.
if (perform_verification_) {
@ -452,5 +455,11 @@ bool SnapshotHandler::CheckPartitionVerification() {
return update_verify_->CheckPartitionVerification();
}
void SnapshotHandler::FreeResources() {
worker_threads_.clear();
read_ahead_thread_ = nullptr;
merge_thread_ = nullptr;
}
} // namespace snapshot
} // namespace android

View file

@ -75,6 +75,7 @@ enum class MERGE_IO_TRANSITION {
READ_AHEAD_FAILURE,
};
class MergeWorker;
class SnapshotHandler;
enum class MERGE_GROUP_STATE {
@ -104,10 +105,9 @@ class Worker {
const std::string& control_device, const std::string& misc_name,
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
bool RunThread();
bool RunMergeThread();
bool Init();
private:
protected:
// Initialization
void InitializeBufsink();
bool InitializeFds();
@ -145,22 +145,9 @@ class Worker {
bool ProcessXorOp(const CowOperation* cow_op);
bool ProcessOrderedOp(const CowOperation* cow_op);
// Merge related ops
bool Merge();
bool AsyncMerge();
bool SyncMerge();
bool MergeOrderedOps();
bool MergeOrderedOpsAsync();
bool MergeReplaceZeroOps();
int PrepareMerge(uint64_t* source_offset, int* pending_ops,
std::vector<const CowOperation*>* replace_zero_vec = nullptr);
sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
bool InitializeIouring();
void FinalizeIouring();
std::unique_ptr<CowReader> reader_;
BufferSink bufsink_;
XorSink xorsink_;
@ -178,18 +165,6 @@ class Worker {
bool header_response_ = false;
std::unique_ptr<ICowOpIter> cowop_iter_;
size_t ra_block_index_ = 0;
uint64_t blocks_merged_in_group_ = 0;
bool merge_async_ = false;
// Queue depth of 8 seems optimal. We don't want
// to have a huge depth as it may put more memory pressure
// on the kernel worker threads given that we use
// IOSQE_ASYNC flag - ASYNC flags can potentially
// result in EINTR; Since we don't restart
// syscalls and fallback to synchronous I/O, we
// don't want huge queue depth
int queue_depth_ = 8;
std::unique_ptr<struct io_uring> ring_;
std::shared_ptr<SnapshotHandler> snapuserd_;
};
@ -212,11 +187,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
bool CommitMerge(int num_merge_ops);
void CloseFds() { cow_fd_ = {}; }
void FreeResources() {
worker_threads_.clear();
read_ahead_thread_ = nullptr;
merge_thread_ = nullptr;
}
void FreeResources();
bool InitializeWorkers();
std::unique_ptr<CowReader> CloneReaderForWorker();
@ -330,7 +301,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
// Merge Block state
std::vector<std::unique_ptr<MergeGroupState>> merge_blk_state_;
std::unique_ptr<Worker> merge_thread_;
std::unique_ptr<MergeWorker> merge_thread_;
double merge_completion_percentage_;
bool merge_initiated_ = false;

View file

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "snapuserd_merge.h"
#include "snapuserd_core.h"
@ -23,8 +24,14 @@ using namespace android;
using namespace android::dm;
using android::base::unique_fd;
int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
std::vector<const CowOperation*>* replace_zero_vec) {
MergeWorker::MergeWorker(const std::string& cow_device, const std::string& backing_device,
const std::string& control_device, const std::string& misc_name,
const std::string& base_path_merge,
std::shared_ptr<SnapshotHandler> snapuserd)
: Worker(cow_device, backing_device, control_device, misc_name, base_path_merge, snapuserd) {}
int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
std::vector<const CowOperation*>* replace_zero_vec) {
int num_ops = *pending_ops;
int nr_consecutive = 0;
bool checkOrderedOp = (replace_zero_vec == nullptr);
@ -70,7 +77,7 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
return nr_consecutive;
}
bool Worker::MergeReplaceZeroOps() {
bool MergeWorker::MergeReplaceZeroOps() {
// Flush after merging 2MB. Since all ops are independent and there is no
// dependency between COW ops, we will flush the data and the number
// of ops merged in COW block device. If there is a crash, we will
@ -149,7 +156,7 @@ bool Worker::MergeReplaceZeroOps() {
if (snapuserd_->IsIOTerminated()) {
SNAP_LOG(ERROR)
<< "MergeReplaceZeroOps: Worker threads terminated - shutting down merge";
<< "MergeReplaceZeroOps: MergeWorker threads terminated - shutting down merge";
return false;
}
}
@ -173,7 +180,7 @@ bool Worker::MergeReplaceZeroOps() {
return true;
}
bool Worker::MergeOrderedOpsAsync() {
bool MergeWorker::MergeOrderedOpsAsync() {
void* mapped_addr = snapuserd_->GetMappedAddr();
void* read_ahead_buffer =
static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
@ -354,7 +361,7 @@ bool Worker::MergeOrderedOpsAsync() {
return true;
}
bool Worker::MergeOrderedOps() {
bool MergeWorker::MergeOrderedOps() {
void* mapped_addr = snapuserd_->GetMappedAddr();
void* read_ahead_buffer =
static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
@ -439,7 +446,7 @@ bool Worker::MergeOrderedOps() {
return true;
}
bool Worker::AsyncMerge() {
bool MergeWorker::AsyncMerge() {
if (!MergeOrderedOpsAsync()) {
SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O";
// Reset the iter so that we retry the merge
@ -455,7 +462,7 @@ bool Worker::AsyncMerge() {
return true;
}
bool Worker::SyncMerge() {
bool MergeWorker::SyncMerge() {
if (!MergeOrderedOps()) {
SNAP_LOG(ERROR) << "Merge failed for ordered ops";
return false;
@ -465,7 +472,7 @@ bool Worker::SyncMerge() {
return true;
}
bool Worker::Merge() {
bool MergeWorker::Merge() {
cowop_iter_ = reader_->GetOpIter(true);
bool retry = false;
@ -511,7 +518,7 @@ bool Worker::Merge() {
return true;
}
bool Worker::InitializeIouring() {
bool MergeWorker::InitializeIouring() {
if (!snapuserd_->IsIouringSupported()) {
return false;
}
@ -530,13 +537,13 @@ bool Worker::InitializeIouring() {
return true;
}
void Worker::FinalizeIouring() {
void MergeWorker::FinalizeIouring() {
if (merge_async_) {
io_uring_queue_exit(ring_.get());
}
}
bool Worker::RunMergeThread() {
bool MergeWorker::Run() {
SNAP_LOG(DEBUG) << "Waiting for merge begin...";
if (!snapuserd_->WaitForMergeBegin()) {
SNAP_LOG(ERROR) << "Merge terminated early...";

View file

@ -0,0 +1,56 @@
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "snapuserd_core.h"
namespace android {
namespace snapshot {
class MergeWorker : public Worker {
public:
MergeWorker(const std::string& cow_device, const std::string& backing_device,
const std::string& control_device, const std::string& misc_name,
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
bool Run();
private:
int PrepareMerge(uint64_t* source_offset, int* pending_ops,
std::vector<const CowOperation*>* replace_zero_vec = nullptr);
bool MergeReplaceZeroOps();
bool MergeOrderedOps();
bool MergeOrderedOpsAsync();
bool Merge();
bool AsyncMerge();
bool SyncMerge();
bool InitializeIouring();
void FinalizeIouring();
private:
std::unique_ptr<struct io_uring> ring_;
size_t ra_block_index_ = 0;
uint64_t blocks_merged_in_group_ = 0;
bool merge_async_ = false;
// Queue depth of 8 seems optimal. We don't want
// to have a huge depth as it may put more memory pressure
// on the kernel worker threads given that we use
// IOSQE_ASYNC flag - ASYNC flags can potentially
// result in EINTR; Since we don't restart
// syscalls and fallback to synchronous I/O, we
// don't want huge queue depth
int queue_depth_ = 8;
};
} // namespace snapshot
} // namespace android