snapuserd: Create a ReadWorker class.

This splits the dm-user specific parts of Worker into a derived class.

Bug: 288273605
Test: snapuserd_test
Change-Id: Ic0ed1a8dff30018fa8466e7dc6e92469f1c87579
This commit is contained in:
David Anderson 2023-06-23 12:39:19 -07:00
parent c28150f56f
commit d967591434
6 changed files with 92 additions and 40 deletions

View file

@ -63,8 +63,8 @@ cc_library_static {
"dm-snapshot-merge/snapuserd_readahead.cpp",
"snapuserd_buffer.cpp",
"user-space-merge/handler_manager.cpp",
"user-space-merge/read_worker.cpp",
"user-space-merge/snapuserd_core.cpp",
"user-space-merge/snapuserd_dm_user.cpp",
"user-space-merge/snapuserd_merge.cpp",
"user-space-merge/snapuserd_readahead.cpp",
"user-space-merge/snapuserd_transitions.cpp",

View file

@ -18,6 +18,7 @@
#include <android-base/logging.h>
#include "read_worker.h"
#include "snapuserd_core.h"
#include "snapuserd_merge.h"

View file

@ -14,6 +14,8 @@
* limitations under the License.
*/
#include "read_worker.h"
#include "snapuserd_core.h"
namespace android {
@ -34,6 +36,12 @@ Worker::Worker(const std::string& cow_device, const std::string& backing_device,
snapuserd_ = snapuserd;
}
ReadWorker::ReadWorker(const std::string& cow_device, const std::string& backing_device,
const std::string& control_device, const std::string& misc_name,
const std::string& base_path_merge,
std::shared_ptr<SnapshotHandler> snapuserd)
: Worker(cow_device, backing_device, control_device, misc_name, base_path_merge, snapuserd) {}
bool Worker::InitializeFds() {
backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
if (backing_store_fd_ < 0) {
@ -118,7 +126,7 @@ bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) {
// Start the copy operation. This will read the backing
// block device which is represented by cow_op->source.
bool Worker::ProcessCopyOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessCopyOp(const CowOperation* cow_op) {
if (!ReadFromSourceDevice(cow_op)) {
return false;
}
@ -126,7 +134,7 @@ bool Worker::ProcessCopyOp(const CowOperation* cow_op) {
return true;
}
bool Worker::ProcessXorOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessXorOp(const CowOperation* cow_op) {
if (!ReadFromSourceDevice(cow_op)) {
return false;
}
@ -165,7 +173,7 @@ bool Worker::ProcessZeroOp() {
return true;
}
bool Worker::ProcessOrderedOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (buffer == nullptr) {
SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer";
@ -218,7 +226,7 @@ bool Worker::ProcessOrderedOp(const CowOperation* cow_op) {
return false;
}
bool Worker::ProcessCowOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessCowOp(const CowOperation* cow_op) {
if (cow_op == nullptr) {
SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
return false;
@ -257,7 +265,6 @@ void Worker::InitializeBufsink() {
bool Worker::Init() {
InitializeBufsink();
xorsink_.Initialize(&bufsink_, BLOCK_SZ);
if (!InitializeFds()) {
return false;
@ -270,7 +277,15 @@ bool Worker::Init() {
return true;
}
bool Worker::RunThread() {
bool ReadWorker::Init() {
if (!Worker::Init()) {
return false;
}
xorsink_.Initialize(&bufsink_, BLOCK_SZ);
return true;
}
bool ReadWorker::Run() {
SNAP_LOG(INFO) << "Processing snapshot I/O requests....";
if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
@ -291,7 +306,7 @@ bool Worker::RunThread() {
}
// Send the payload/data back to dm-user misc device.
bool Worker::WriteDmUserPayload(size_t size) {
bool ReadWorker::WriteDmUserPayload(size_t size) {
size_t payload_size = size;
void* buf = bufsink_.GetPayloadBufPtr();
if (header_response_) {
@ -329,7 +344,7 @@ bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
return true;
}
bool Worker::ReadAlignedSector(sector_t sector, size_t sz) {
bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
size_t remaining_size = sz;
std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
int ret = 0;
@ -389,7 +404,7 @@ bool Worker::ReadAlignedSector(sector_t sector, size_t sz) {
return true;
}
int Worker::ReadUnalignedSector(
int ReadWorker::ReadUnalignedSector(
sector_t sector, size_t size,
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
size_t skip_sector_size = 0;
@ -424,7 +439,7 @@ int Worker::ReadUnalignedSector(
return std::min(size, (BLOCK_SZ - skip_sector_size));
}
bool Worker::ReadUnalignedSector(sector_t sector, size_t size) {
bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
bufsink_.ResetBufferOffset();
std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
@ -563,7 +578,7 @@ bool Worker::ReadUnalignedSector(sector_t sector, size_t size) {
return true;
}
void Worker::RespondIOError() {
void ReadWorker::RespondIOError() {
struct dm_user_header* header = bufsink_.GetHeaderPtr();
header->type = DM_USER_RESP_ERROR;
// This is an issue with the dm-user interface. There
@ -580,7 +595,7 @@ void Worker::RespondIOError() {
WriteDmUserPayload(0);
}
bool Worker::DmuserReadRequest() {
bool ReadWorker::DmuserReadRequest() {
struct dm_user_header* header = bufsink_.GetHeaderPtr();
// Unaligned I/O request
@ -591,7 +606,7 @@ bool Worker::DmuserReadRequest() {
return ReadAlignedSector(header->sector, header->len);
}
bool Worker::ProcessIORequest() {
bool ReadWorker::ProcessIORequest() {
// Read Header from dm-user misc device. This gives
// us the sector number for which IO is issued by dm-snapshot device
struct dm_user_header* header = bufsink_.GetHeaderPtr();

View file

@ -0,0 +1,53 @@
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "snapuserd_core.h"
namespace android {
namespace snapshot {
class ReadWorker : public Worker {
public:
ReadWorker(const std::string& cow_device, const std::string& backing_device,
const std::string& control_device, const std::string& misc_name,
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
bool Run();
bool Init() override;
private:
// Functions interacting with dm-user
bool ProcessIORequest();
bool WriteDmUserPayload(size_t size);
bool DmuserReadRequest();
void RespondIOError();
bool ProcessCowOp(const CowOperation* cow_op);
bool ProcessXorOp(const CowOperation* cow_op);
bool ProcessOrderedOp(const CowOperation* cow_op);
bool ProcessCopyOp(const CowOperation* cow_op);
bool ReadAlignedSector(sector_t sector, size_t sz);
bool ReadUnalignedSector(sector_t sector, size_t size);
int ReadUnalignedSector(sector_t sector, size_t size,
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
XorSink xorsink_;
bool header_response_ = false;
};
} // namespace snapshot
} // namespace android

View file

@ -23,6 +23,7 @@
#include <android-base/scopeguard.h>
#include <android-base/strings.h>
#include "read_worker.h"
#include "snapuserd_merge.h"
namespace android {
@ -48,9 +49,8 @@ SnapshotHandler::SnapshotHandler(std::string misc_name, std::string cow_device,
bool SnapshotHandler::InitializeWorkers() {
for (int i = 0; i < num_worker_threads_; i++) {
std::unique_ptr<Worker> wt =
std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
misc_name_, base_path_merge_, GetSharedPtr());
auto wt = std::make_unique<ReadWorker>(cow_device_, backing_store_device_, control_device_,
misc_name_, base_path_merge_, GetSharedPtr());
if (!wt->Init()) {
SNAP_LOG(ERROR) << "Thread initialization failed";
return false;
@ -315,7 +315,7 @@ bool SnapshotHandler::Start() {
// Launch worker threads
for (int i = 0; i < worker_threads_.size(); i++) {
threads.emplace_back(
std::async(std::launch::async, &Worker::RunThread, worker_threads_[i].get()));
std::async(std::launch::async, &ReadWorker::Run, worker_threads_[i].get()));
}
std::future<bool> merge_thread =

View file

@ -76,6 +76,7 @@ enum class MERGE_IO_TRANSITION {
};
class MergeWorker;
class ReadWorker;
class SnapshotHandler;
enum class MERGE_GROUP_STATE {
@ -104,8 +105,9 @@ class Worker {
Worker(const std::string& cow_device, const std::string& backing_device,
const std::string& control_device, const std::string& misc_name,
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
bool RunThread();
bool Init();
virtual ~Worker() = default;
virtual bool Init();
protected:
// Initialization
@ -118,39 +120,21 @@ class Worker {
base_path_merge_fd_ = {};
}
// Functions interacting with dm-user
bool WriteDmUserPayload(size_t size);
bool DmuserReadRequest();
// IO Path
bool ProcessIORequest();
bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
bool ReadFromSourceDevice(const CowOperation* cow_op);
bool ReadAlignedSector(sector_t sector, size_t sz);
bool ReadUnalignedSector(sector_t sector, size_t size);
int ReadUnalignedSector(sector_t sector, size_t size,
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
void RespondIOError();
// Processing COW operations
bool ProcessCowOp(const CowOperation* cow_op);
bool ProcessReplaceOp(const CowOperation* cow_op);
bool ProcessZeroOp();
// Handles Copy and Xor
bool ProcessCopyOp(const CowOperation* cow_op);
bool ProcessXorOp(const CowOperation* cow_op);
bool ProcessOrderedOp(const CowOperation* cow_op);
sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
std::unique_ptr<CowReader> reader_;
BufferSink bufsink_;
XorSink xorsink_;
std::string cow_device_;
std::string backing_store_device_;
@ -162,7 +146,6 @@ class Worker {
unique_fd backing_store_fd_;
unique_fd base_path_merge_fd_;
unique_fd ctrl_fd_;
bool header_response_ = false;
std::unique_ptr<ICowOpIter> cowop_iter_;
@ -286,7 +269,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
void* mapped_addr_;
size_t total_mapped_addr_length_;
std::vector<std::unique_ptr<Worker>> worker_threads_;
std::vector<std::unique_ptr<ReadWorker>> worker_threads_;
// Read-ahead related
bool populate_data_from_cow_ = false;
bool ra_thread_ = false;