Merge changes I1625d1a6,I2db9cfa2,I59c31318,Ic0ed1a8d,I612374bb into main am: 83ebc4376d
am: b19326fbad
Original change: https://android-review.googlesource.com/c/platform/system/core/+/2637513 Change-Id: Iabdcd63a1e316ed1035a28429564e8d7d6c9877b Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
This commit is contained in:
commit
dff3daad7a
10 changed files with 369 additions and 211 deletions
|
@ -64,12 +64,13 @@ cc_library_static {
|
|||
"dm-snapshot-merge/snapuserd_readahead.cpp",
|
||||
"snapuserd_buffer.cpp",
|
||||
"user-space-merge/handler_manager.cpp",
|
||||
"user-space-merge/read_worker.cpp",
|
||||
"user-space-merge/snapuserd_core.cpp",
|
||||
"user-space-merge/snapuserd_dm_user.cpp",
|
||||
"user-space-merge/snapuserd_merge.cpp",
|
||||
"user-space-merge/snapuserd_readahead.cpp",
|
||||
"user-space-merge/snapuserd_transitions.cpp",
|
||||
"user-space-merge/snapuserd_verify.cpp",
|
||||
"user-space-merge/worker.cpp",
|
||||
],
|
||||
static_libs: [
|
||||
"libbase",
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
|
||||
#include <android-base/logging.h>
|
||||
|
||||
#include "read_worker.h"
|
||||
#include "snapuserd_core.h"
|
||||
#include "snapuserd_merge.h"
|
||||
|
||||
namespace android {
|
||||
namespace snapshot {
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "read_worker.h"
|
||||
|
||||
#include "snapuserd_core.h"
|
||||
|
||||
namespace android {
|
||||
|
@ -23,59 +25,24 @@ using namespace android;
|
|||
using namespace android::dm;
|
||||
using android::base::unique_fd;
|
||||
|
||||
Worker::Worker(const std::string& cow_device, const std::string& backing_device,
|
||||
const std::string& control_device, const std::string& misc_name,
|
||||
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd) {
|
||||
cow_device_ = cow_device;
|
||||
backing_store_device_ = backing_device;
|
||||
control_device_ = control_device;
|
||||
misc_name_ = misc_name;
|
||||
base_path_merge_ = base_path_merge;
|
||||
snapuserd_ = snapuserd;
|
||||
void ReadWorker::CloseFds() {
|
||||
ctrl_fd_ = {};
|
||||
backing_store_fd_ = {};
|
||||
Worker::CloseFds();
|
||||
}
|
||||
|
||||
bool Worker::InitializeFds() {
|
||||
backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
|
||||
if (backing_store_fd_ < 0) {
|
||||
SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
|
||||
return false;
|
||||
}
|
||||
|
||||
cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
|
||||
if (cow_fd_ < 0) {
|
||||
SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
|
||||
return false;
|
||||
}
|
||||
|
||||
ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
|
||||
if (ctrl_fd_ < 0) {
|
||||
SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Base device used by merge thread
|
||||
base_path_merge_fd_.reset(open(base_path_merge_.c_str(), O_RDWR));
|
||||
if (base_path_merge_fd_ < 0) {
|
||||
SNAP_PLOG(ERROR) << "Open Failed: " << base_path_merge_;
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Worker::InitReader() {
|
||||
reader_ = snapuserd_->CloneReaderForWorker();
|
||||
|
||||
if (!reader_->InitForMerge(std::move(cow_fd_))) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
ReadWorker::ReadWorker(const std::string& cow_device, const std::string& backing_device,
|
||||
const std::string& control_device, const std::string& misc_name,
|
||||
const std::string& base_path_merge,
|
||||
std::shared_ptr<SnapshotHandler> snapuserd)
|
||||
: Worker(cow_device, misc_name, base_path_merge, snapuserd),
|
||||
backing_store_device_(backing_device),
|
||||
control_device_(control_device) {}
|
||||
|
||||
// Start the replace operation. This will read the
|
||||
// internal COW format and if the block is compressed,
|
||||
// it will be de-compressed.
|
||||
bool Worker::ProcessReplaceOp(const CowOperation* cow_op) {
|
||||
bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op) {
|
||||
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
|
||||
if (!buffer) {
|
||||
SNAP_LOG(ERROR) << "ProcessReplaceOp failed to allocate buffer";
|
||||
|
@ -88,7 +55,7 @@ bool Worker::ProcessReplaceOp(const CowOperation* cow_op) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) {
|
||||
bool ReadWorker::ReadFromSourceDevice(const CowOperation* cow_op) {
|
||||
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
|
||||
if (buffer == nullptr) {
|
||||
SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
|
||||
|
@ -118,7 +85,7 @@ bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) {
|
|||
|
||||
// Start the copy operation. This will read the backing
|
||||
// block device which is represented by cow_op->source.
|
||||
bool Worker::ProcessCopyOp(const CowOperation* cow_op) {
|
||||
bool ReadWorker::ProcessCopyOp(const CowOperation* cow_op) {
|
||||
if (!ReadFromSourceDevice(cow_op)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -126,7 +93,7 @@ bool Worker::ProcessCopyOp(const CowOperation* cow_op) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Worker::ProcessXorOp(const CowOperation* cow_op) {
|
||||
bool ReadWorker::ProcessXorOp(const CowOperation* cow_op) {
|
||||
if (!ReadFromSourceDevice(cow_op)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -153,7 +120,7 @@ bool Worker::ProcessXorOp(const CowOperation* cow_op) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Worker::ProcessZeroOp() {
|
||||
bool ReadWorker::ProcessZeroOp() {
|
||||
// Zero out the entire block
|
||||
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
|
||||
if (buffer == nullptr) {
|
||||
|
@ -165,7 +132,7 @@ bool Worker::ProcessZeroOp() {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Worker::ProcessOrderedOp(const CowOperation* cow_op) {
|
||||
bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
|
||||
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
|
||||
if (buffer == nullptr) {
|
||||
SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer";
|
||||
|
@ -218,7 +185,7 @@ bool Worker::ProcessOrderedOp(const CowOperation* cow_op) {
|
|||
return false;
|
||||
}
|
||||
|
||||
bool Worker::ProcessCowOp(const CowOperation* cow_op) {
|
||||
bool ReadWorker::ProcessCowOp(const CowOperation* cow_op) {
|
||||
if (cow_op == nullptr) {
|
||||
SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
|
||||
return false;
|
||||
|
@ -246,31 +213,28 @@ bool Worker::ProcessCowOp(const CowOperation* cow_op) {
|
|||
return false;
|
||||
}
|
||||
|
||||
void Worker::InitializeBufsink() {
|
||||
// Allocate the buffer which is used to communicate between
|
||||
// daemon and dm-user. The buffer comprises of header and a fixed payload.
|
||||
// If the dm-user requests a big IO, the IO will be broken into chunks
|
||||
// of PAYLOAD_BUFFER_SZ.
|
||||
size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_BUFFER_SZ;
|
||||
bufsink_.Initialize(buf_size);
|
||||
}
|
||||
bool ReadWorker::Init() {
|
||||
if (!Worker::Init()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
|
||||
if (backing_store_fd_ < 0) {
|
||||
SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
|
||||
return false;
|
||||
}
|
||||
|
||||
ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
|
||||
if (ctrl_fd_ < 0) {
|
||||
SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
|
||||
return false;
|
||||
}
|
||||
|
||||
bool Worker::Init() {
|
||||
InitializeBufsink();
|
||||
xorsink_.Initialize(&bufsink_, BLOCK_SZ);
|
||||
|
||||
if (!InitializeFds()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!InitReader()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Worker::RunThread() {
|
||||
bool ReadWorker::Run() {
|
||||
SNAP_LOG(INFO) << "Processing snapshot I/O requests....";
|
||||
|
||||
if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
|
||||
|
@ -291,7 +255,7 @@ bool Worker::RunThread() {
|
|||
}
|
||||
|
||||
// Send the payload/data back to dm-user misc device.
|
||||
bool Worker::WriteDmUserPayload(size_t size) {
|
||||
bool ReadWorker::WriteDmUserPayload(size_t size) {
|
||||
size_t payload_size = size;
|
||||
void* buf = bufsink_.GetPayloadBufPtr();
|
||||
if (header_response_) {
|
||||
|
@ -310,7 +274,7 @@ bool Worker::WriteDmUserPayload(size_t size) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
|
||||
bool ReadWorker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
|
||||
CHECK(read_size <= BLOCK_SZ);
|
||||
|
||||
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
|
||||
|
@ -329,7 +293,7 @@ bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Worker::ReadAlignedSector(sector_t sector, size_t sz) {
|
||||
bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
|
||||
size_t remaining_size = sz;
|
||||
std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
|
||||
int ret = 0;
|
||||
|
@ -389,7 +353,7 @@ bool Worker::ReadAlignedSector(sector_t sector, size_t sz) {
|
|||
return true;
|
||||
}
|
||||
|
||||
int Worker::ReadUnalignedSector(
|
||||
int ReadWorker::ReadUnalignedSector(
|
||||
sector_t sector, size_t size,
|
||||
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
|
||||
size_t skip_sector_size = 0;
|
||||
|
@ -424,7 +388,7 @@ int Worker::ReadUnalignedSector(
|
|||
return std::min(size, (BLOCK_SZ - skip_sector_size));
|
||||
}
|
||||
|
||||
bool Worker::ReadUnalignedSector(sector_t sector, size_t size) {
|
||||
bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
|
||||
bufsink_.ResetBufferOffset();
|
||||
std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
|
||||
|
||||
|
@ -563,7 +527,7 @@ bool Worker::ReadUnalignedSector(sector_t sector, size_t size) {
|
|||
return true;
|
||||
}
|
||||
|
||||
void Worker::RespondIOError() {
|
||||
void ReadWorker::RespondIOError() {
|
||||
struct dm_user_header* header = bufsink_.GetHeaderPtr();
|
||||
header->type = DM_USER_RESP_ERROR;
|
||||
// This is an issue with the dm-user interface. There
|
||||
|
@ -580,7 +544,7 @@ void Worker::RespondIOError() {
|
|||
WriteDmUserPayload(0);
|
||||
}
|
||||
|
||||
bool Worker::DmuserReadRequest() {
|
||||
bool ReadWorker::DmuserReadRequest() {
|
||||
struct dm_user_header* header = bufsink_.GetHeaderPtr();
|
||||
|
||||
// Unaligned I/O request
|
||||
|
@ -591,7 +555,7 @@ bool Worker::DmuserReadRequest() {
|
|||
return ReadAlignedSector(header->sector, header->len);
|
||||
}
|
||||
|
||||
bool Worker::ProcessIORequest() {
|
||||
bool ReadWorker::ProcessIORequest() {
|
||||
// Read Header from dm-user misc device. This gives
|
||||
// us the sector number for which IO is issued by dm-snapshot device
|
||||
struct dm_user_header* header = bufsink_.GetHeaderPtr();
|
70
fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
Normal file
70
fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
Normal file
|
@ -0,0 +1,70 @@
|
|||
// Copyright (C) 2023 The Android Open Source Project
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "worker.h"
|
||||
|
||||
namespace android {
|
||||
namespace snapshot {
|
||||
|
||||
class ReadWorker : public Worker {
|
||||
public:
|
||||
ReadWorker(const std::string& cow_device, const std::string& backing_device,
|
||||
const std::string& control_device, const std::string& misc_name,
|
||||
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
|
||||
|
||||
bool Run();
|
||||
bool Init() override;
|
||||
void CloseFds() override;
|
||||
|
||||
private:
|
||||
// Functions interacting with dm-user
|
||||
bool ProcessIORequest();
|
||||
bool WriteDmUserPayload(size_t size);
|
||||
bool DmuserReadRequest();
|
||||
void RespondIOError();
|
||||
|
||||
bool ProcessCowOp(const CowOperation* cow_op);
|
||||
bool ProcessXorOp(const CowOperation* cow_op);
|
||||
bool ProcessOrderedOp(const CowOperation* cow_op);
|
||||
bool ProcessCopyOp(const CowOperation* cow_op);
|
||||
bool ProcessReplaceOp(const CowOperation* cow_op);
|
||||
bool ProcessZeroOp();
|
||||
|
||||
bool ReadAlignedSector(sector_t sector, size_t sz);
|
||||
bool ReadUnalignedSector(sector_t sector, size_t size);
|
||||
int ReadUnalignedSector(sector_t sector, size_t size,
|
||||
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
|
||||
bool ReadFromSourceDevice(const CowOperation* cow_op);
|
||||
bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
|
||||
|
||||
constexpr bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
|
||||
constexpr sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
|
||||
|
||||
std::string backing_store_device_;
|
||||
unique_fd backing_store_fd_;
|
||||
|
||||
std::string control_device_;
|
||||
unique_fd ctrl_fd_;
|
||||
|
||||
XorSink xorsink_;
|
||||
bool header_response_ = false;
|
||||
};
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace android
|
|
@ -23,6 +23,9 @@
|
|||
#include <android-base/scopeguard.h>
|
||||
#include <android-base/strings.h>
|
||||
|
||||
#include "read_worker.h"
|
||||
#include "snapuserd_merge.h"
|
||||
|
||||
namespace android {
|
||||
namespace snapshot {
|
||||
|
||||
|
@ -46,9 +49,8 @@ SnapshotHandler::SnapshotHandler(std::string misc_name, std::string cow_device,
|
|||
|
||||
bool SnapshotHandler::InitializeWorkers() {
|
||||
for (int i = 0; i < num_worker_threads_; i++) {
|
||||
std::unique_ptr<Worker> wt =
|
||||
std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
|
||||
misc_name_, base_path_merge_, GetSharedPtr());
|
||||
auto wt = std::make_unique<ReadWorker>(cow_device_, backing_store_device_, control_device_,
|
||||
misc_name_, base_path_merge_, GetSharedPtr());
|
||||
if (!wt->Init()) {
|
||||
SNAP_LOG(ERROR) << "Thread initialization failed";
|
||||
return false;
|
||||
|
@ -57,8 +59,8 @@ bool SnapshotHandler::InitializeWorkers() {
|
|||
worker_threads_.push_back(std::move(wt));
|
||||
}
|
||||
|
||||
merge_thread_ = std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
|
||||
misc_name_, base_path_merge_, GetSharedPtr());
|
||||
merge_thread_ = std::make_unique<MergeWorker>(cow_device_, misc_name_, base_path_merge_,
|
||||
GetSharedPtr());
|
||||
|
||||
read_ahead_thread_ = std::make_unique<ReadAhead>(cow_device_, backing_store_device_, misc_name_,
|
||||
GetSharedPtr());
|
||||
|
@ -312,11 +314,11 @@ bool SnapshotHandler::Start() {
|
|||
// Launch worker threads
|
||||
for (int i = 0; i < worker_threads_.size(); i++) {
|
||||
threads.emplace_back(
|
||||
std::async(std::launch::async, &Worker::RunThread, worker_threads_[i].get()));
|
||||
std::async(std::launch::async, &ReadWorker::Run, worker_threads_[i].get()));
|
||||
}
|
||||
|
||||
std::future<bool> merge_thread =
|
||||
std::async(std::launch::async, &Worker::RunMergeThread, merge_thread_.get());
|
||||
std::async(std::launch::async, &MergeWorker::Run, merge_thread_.get());
|
||||
|
||||
// Now that the worker threads are up, scan the partitions.
|
||||
if (perform_verification_) {
|
||||
|
@ -452,5 +454,11 @@ bool SnapshotHandler::CheckPartitionVerification() {
|
|||
return update_verify_->CheckPartitionVerification();
|
||||
}
|
||||
|
||||
void SnapshotHandler::FreeResources() {
|
||||
worker_threads_.clear();
|
||||
read_ahead_thread_ = nullptr;
|
||||
merge_thread_ = nullptr;
|
||||
}
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace android
|
||||
|
|
|
@ -75,7 +75,8 @@ enum class MERGE_IO_TRANSITION {
|
|||
READ_AHEAD_FAILURE,
|
||||
};
|
||||
|
||||
class SnapshotHandler;
|
||||
class MergeWorker;
|
||||
class ReadWorker;
|
||||
|
||||
enum class MERGE_GROUP_STATE {
|
||||
GROUP_MERGE_PENDING,
|
||||
|
@ -98,102 +99,6 @@ struct MergeGroupState {
|
|||
: merge_state_(state), num_ios_in_progress(n_ios) {}
|
||||
};
|
||||
|
||||
class Worker {
|
||||
public:
|
||||
Worker(const std::string& cow_device, const std::string& backing_device,
|
||||
const std::string& control_device, const std::string& misc_name,
|
||||
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
|
||||
bool RunThread();
|
||||
bool RunMergeThread();
|
||||
bool Init();
|
||||
|
||||
private:
|
||||
// Initialization
|
||||
void InitializeBufsink();
|
||||
bool InitializeFds();
|
||||
bool InitReader();
|
||||
void CloseFds() {
|
||||
ctrl_fd_ = {};
|
||||
backing_store_fd_ = {};
|
||||
base_path_merge_fd_ = {};
|
||||
}
|
||||
|
||||
// Functions interacting with dm-user
|
||||
bool WriteDmUserPayload(size_t size);
|
||||
bool DmuserReadRequest();
|
||||
|
||||
// IO Path
|
||||
bool ProcessIORequest();
|
||||
bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
|
||||
|
||||
bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
|
||||
bool ReadFromSourceDevice(const CowOperation* cow_op);
|
||||
|
||||
bool ReadAlignedSector(sector_t sector, size_t sz);
|
||||
bool ReadUnalignedSector(sector_t sector, size_t size);
|
||||
int ReadUnalignedSector(sector_t sector, size_t size,
|
||||
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
|
||||
void RespondIOError();
|
||||
|
||||
// Processing COW operations
|
||||
bool ProcessCowOp(const CowOperation* cow_op);
|
||||
bool ProcessReplaceOp(const CowOperation* cow_op);
|
||||
bool ProcessZeroOp();
|
||||
|
||||
// Handles Copy and Xor
|
||||
bool ProcessCopyOp(const CowOperation* cow_op);
|
||||
bool ProcessXorOp(const CowOperation* cow_op);
|
||||
bool ProcessOrderedOp(const CowOperation* cow_op);
|
||||
|
||||
// Merge related ops
|
||||
bool Merge();
|
||||
bool AsyncMerge();
|
||||
bool SyncMerge();
|
||||
bool MergeOrderedOps();
|
||||
bool MergeOrderedOpsAsync();
|
||||
bool MergeReplaceZeroOps();
|
||||
int PrepareMerge(uint64_t* source_offset, int* pending_ops,
|
||||
std::vector<const CowOperation*>* replace_zero_vec = nullptr);
|
||||
|
||||
sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
|
||||
chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
|
||||
|
||||
bool InitializeIouring();
|
||||
void FinalizeIouring();
|
||||
|
||||
std::unique_ptr<CowReader> reader_;
|
||||
BufferSink bufsink_;
|
||||
XorSink xorsink_;
|
||||
|
||||
std::string cow_device_;
|
||||
std::string backing_store_device_;
|
||||
std::string control_device_;
|
||||
std::string misc_name_;
|
||||
std::string base_path_merge_;
|
||||
|
||||
unique_fd cow_fd_;
|
||||
unique_fd backing_store_fd_;
|
||||
unique_fd base_path_merge_fd_;
|
||||
unique_fd ctrl_fd_;
|
||||
bool header_response_ = false;
|
||||
|
||||
std::unique_ptr<ICowOpIter> cowop_iter_;
|
||||
size_t ra_block_index_ = 0;
|
||||
uint64_t blocks_merged_in_group_ = 0;
|
||||
bool merge_async_ = false;
|
||||
// Queue depth of 8 seems optimal. We don't want
|
||||
// to have a huge depth as it may put more memory pressure
|
||||
// on the kernel worker threads given that we use
|
||||
// IOSQE_ASYNC flag - ASYNC flags can potentially
|
||||
// result in EINTR; Since we don't restart
|
||||
// syscalls and fallback to synchronous I/O, we
|
||||
// don't want huge queue depth
|
||||
int queue_depth_ = 8;
|
||||
std::unique_ptr<struct io_uring> ring_;
|
||||
|
||||
std::shared_ptr<SnapshotHandler> snapuserd_;
|
||||
};
|
||||
|
||||
class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
|
||||
public:
|
||||
SnapshotHandler(std::string misc_name, std::string cow_device, std::string backing_device,
|
||||
|
@ -212,11 +117,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
|
|||
bool CommitMerge(int num_merge_ops);
|
||||
|
||||
void CloseFds() { cow_fd_ = {}; }
|
||||
void FreeResources() {
|
||||
worker_threads_.clear();
|
||||
read_ahead_thread_ = nullptr;
|
||||
merge_thread_ = nullptr;
|
||||
}
|
||||
void FreeResources();
|
||||
|
||||
bool InitializeWorkers();
|
||||
std::unique_ptr<CowReader> CloneReaderForWorker();
|
||||
|
@ -315,7 +216,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
|
|||
void* mapped_addr_;
|
||||
size_t total_mapped_addr_length_;
|
||||
|
||||
std::vector<std::unique_ptr<Worker>> worker_threads_;
|
||||
std::vector<std::unique_ptr<ReadWorker>> worker_threads_;
|
||||
// Read-ahead related
|
||||
bool populate_data_from_cow_ = false;
|
||||
bool ra_thread_ = false;
|
||||
|
@ -330,7 +231,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
|
|||
// Merge Block state
|
||||
std::vector<std::unique_ptr<MergeGroupState>> merge_blk_state_;
|
||||
|
||||
std::unique_ptr<Worker> merge_thread_;
|
||||
std::unique_ptr<MergeWorker> merge_thread_;
|
||||
double merge_completion_percentage_;
|
||||
|
||||
bool merge_initiated_ = false;
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#include "snapuserd_merge.h"
|
||||
|
||||
#include "snapuserd_core.h"
|
||||
|
||||
|
@ -23,8 +24,13 @@ using namespace android;
|
|||
using namespace android::dm;
|
||||
using android::base::unique_fd;
|
||||
|
||||
int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
|
||||
std::vector<const CowOperation*>* replace_zero_vec) {
|
||||
MergeWorker::MergeWorker(const std::string& cow_device, const std::string& misc_name,
|
||||
const std::string& base_path_merge,
|
||||
std::shared_ptr<SnapshotHandler> snapuserd)
|
||||
: Worker(cow_device, misc_name, base_path_merge, snapuserd) {}
|
||||
|
||||
int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
|
||||
std::vector<const CowOperation*>* replace_zero_vec) {
|
||||
int num_ops = *pending_ops;
|
||||
int nr_consecutive = 0;
|
||||
bool checkOrderedOp = (replace_zero_vec == nullptr);
|
||||
|
@ -70,7 +76,7 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
|
|||
return nr_consecutive;
|
||||
}
|
||||
|
||||
bool Worker::MergeReplaceZeroOps() {
|
||||
bool MergeWorker::MergeReplaceZeroOps() {
|
||||
// Flush after merging 2MB. Since all ops are independent and there is no
|
||||
// dependency between COW ops, we will flush the data and the number
|
||||
// of ops merged in COW block device. If there is a crash, we will
|
||||
|
@ -99,17 +105,20 @@ bool Worker::MergeReplaceZeroOps() {
|
|||
|
||||
for (size_t i = 0; i < replace_zero_vec.size(); i++) {
|
||||
const CowOperation* cow_op = replace_zero_vec[i];
|
||||
|
||||
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
|
||||
if (!buffer) {
|
||||
SNAP_LOG(ERROR) << "Failed to acquire buffer in merge";
|
||||
return false;
|
||||
}
|
||||
if (cow_op->type == kCowReplaceOp) {
|
||||
if (!ProcessReplaceOp(cow_op)) {
|
||||
SNAP_LOG(ERROR) << "Merge - ReplaceOp failed for block: " << cow_op->new_block;
|
||||
if (!reader_->ReadData(cow_op, buffer, BLOCK_SZ)) {
|
||||
SNAP_LOG(ERROR) << "Failed to read COW in merge";
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
CHECK(cow_op->type == kCowZeroOp);
|
||||
if (!ProcessZeroOp()) {
|
||||
SNAP_LOG(ERROR) << "Merge ZeroOp failed.";
|
||||
return false;
|
||||
}
|
||||
memset(buffer, 0, BLOCK_SZ);
|
||||
}
|
||||
|
||||
bufsink_.UpdateBufferOffset(BLOCK_SZ);
|
||||
|
@ -149,7 +158,7 @@ bool Worker::MergeReplaceZeroOps() {
|
|||
|
||||
if (snapuserd_->IsIOTerminated()) {
|
||||
SNAP_LOG(ERROR)
|
||||
<< "MergeReplaceZeroOps: Worker threads terminated - shutting down merge";
|
||||
<< "MergeReplaceZeroOps: MergeWorker threads terminated - shutting down merge";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -173,7 +182,7 @@ bool Worker::MergeReplaceZeroOps() {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Worker::MergeOrderedOpsAsync() {
|
||||
bool MergeWorker::MergeOrderedOpsAsync() {
|
||||
void* mapped_addr = snapuserd_->GetMappedAddr();
|
||||
void* read_ahead_buffer =
|
||||
static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
|
||||
|
@ -354,7 +363,7 @@ bool Worker::MergeOrderedOpsAsync() {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Worker::MergeOrderedOps() {
|
||||
bool MergeWorker::MergeOrderedOps() {
|
||||
void* mapped_addr = snapuserd_->GetMappedAddr();
|
||||
void* read_ahead_buffer =
|
||||
static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
|
||||
|
@ -439,7 +448,7 @@ bool Worker::MergeOrderedOps() {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Worker::AsyncMerge() {
|
||||
bool MergeWorker::AsyncMerge() {
|
||||
if (!MergeOrderedOpsAsync()) {
|
||||
SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O";
|
||||
// Reset the iter so that we retry the merge
|
||||
|
@ -455,7 +464,7 @@ bool Worker::AsyncMerge() {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Worker::SyncMerge() {
|
||||
bool MergeWorker::SyncMerge() {
|
||||
if (!MergeOrderedOps()) {
|
||||
SNAP_LOG(ERROR) << "Merge failed for ordered ops";
|
||||
return false;
|
||||
|
@ -465,7 +474,7 @@ bool Worker::SyncMerge() {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Worker::Merge() {
|
||||
bool MergeWorker::Merge() {
|
||||
cowop_iter_ = reader_->GetOpIter(true);
|
||||
|
||||
bool retry = false;
|
||||
|
@ -511,7 +520,7 @@ bool Worker::Merge() {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Worker::InitializeIouring() {
|
||||
bool MergeWorker::InitializeIouring() {
|
||||
if (!snapuserd_->IsIouringSupported()) {
|
||||
return false;
|
||||
}
|
||||
|
@ -530,13 +539,13 @@ bool Worker::InitializeIouring() {
|
|||
return true;
|
||||
}
|
||||
|
||||
void Worker::FinalizeIouring() {
|
||||
void MergeWorker::FinalizeIouring() {
|
||||
if (merge_async_) {
|
||||
io_uring_queue_exit(ring_.get());
|
||||
}
|
||||
}
|
||||
|
||||
bool Worker::RunMergeThread() {
|
||||
bool MergeWorker::Run() {
|
||||
SNAP_LOG(DEBUG) << "Waiting for merge begin...";
|
||||
if (!snapuserd_->WaitForMergeBegin()) {
|
||||
SNAP_LOG(ERROR) << "Merge terminated early...";
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
// Copyright (C) 2023 The Android Open Source Project
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
#pragma once
|
||||
|
||||
#include "worker.h"
|
||||
|
||||
#include <liburing.h>
|
||||
|
||||
namespace android {
|
||||
namespace snapshot {
|
||||
|
||||
class MergeWorker : public Worker {
|
||||
public:
|
||||
MergeWorker(const std::string& cow_device, const std::string& misc_name,
|
||||
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
|
||||
bool Run();
|
||||
|
||||
private:
|
||||
int PrepareMerge(uint64_t* source_offset, int* pending_ops,
|
||||
std::vector<const CowOperation*>* replace_zero_vec = nullptr);
|
||||
bool MergeReplaceZeroOps();
|
||||
bool MergeOrderedOps();
|
||||
bool MergeOrderedOpsAsync();
|
||||
bool Merge();
|
||||
bool AsyncMerge();
|
||||
bool SyncMerge();
|
||||
bool InitializeIouring();
|
||||
void FinalizeIouring();
|
||||
|
||||
private:
|
||||
std::unique_ptr<ICowOpIter> cowop_iter_;
|
||||
std::unique_ptr<struct io_uring> ring_;
|
||||
size_t ra_block_index_ = 0;
|
||||
uint64_t blocks_merged_in_group_ = 0;
|
||||
bool merge_async_ = false;
|
||||
// Queue depth of 8 seems optimal. We don't want
|
||||
// to have a huge depth as it may put more memory pressure
|
||||
// on the kernel worker threads given that we use
|
||||
// IOSQE_ASYNC flag - ASYNC flags can potentially
|
||||
// result in EINTR; Since we don't restart
|
||||
// syscalls and fallback to synchronous I/O, we
|
||||
// don't want huge queue depth
|
||||
int queue_depth_ = 8;
|
||||
};
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace android
|
80
fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.cpp
Normal file
80
fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.cpp
Normal file
|
@ -0,0 +1,80 @@
|
|||
// Copyright (C) 2023 The Android Open Source Project
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "worker.h"
|
||||
|
||||
#include "snapuserd_core.h"
|
||||
|
||||
namespace android {
|
||||
namespace snapshot {
|
||||
|
||||
Worker::Worker(const std::string& cow_device, const std::string& misc_name,
|
||||
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd) {
|
||||
cow_device_ = cow_device;
|
||||
misc_name_ = misc_name;
|
||||
base_path_merge_ = base_path_merge;
|
||||
snapuserd_ = snapuserd;
|
||||
}
|
||||
|
||||
void Worker::InitializeBufsink() {
|
||||
// Allocate the buffer which is used to communicate between
|
||||
// daemon and dm-user. The buffer comprises of header and a fixed payload.
|
||||
// If the dm-user requests a big IO, the IO will be broken into chunks
|
||||
// of PAYLOAD_BUFFER_SZ.
|
||||
size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_BUFFER_SZ;
|
||||
bufsink_.Initialize(buf_size);
|
||||
}
|
||||
|
||||
bool Worker::Init() {
|
||||
InitializeBufsink();
|
||||
|
||||
if (!InitializeFds()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!InitReader()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Worker::InitReader() {
|
||||
reader_ = snapuserd_->CloneReaderForWorker();
|
||||
|
||||
if (!reader_->InitForMerge(std::move(cow_fd_))) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Worker::InitializeFds() {
|
||||
cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
|
||||
if (cow_fd_ < 0) {
|
||||
SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Base device used by merge thread
|
||||
base_path_merge_fd_.reset(open(base_path_merge_.c_str(), O_RDWR));
|
||||
if (base_path_merge_fd_ < 0) {
|
||||
SNAP_PLOG(ERROR) << "Open Failed: " << base_path_merge_;
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace android
|
65
fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.h
Normal file
65
fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.h
Normal file
|
@ -0,0 +1,65 @@
|
|||
// Copyright (C) 2023 The Android Open Source Project
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <stddef.h>
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include <android-base/unique_fd.h>
|
||||
#include <libsnapshot/cow_reader.h>
|
||||
#include <snapuserd/snapuserd_buffer.h>
|
||||
#include <snapuserd/snapuserd_kernel.h>
|
||||
|
||||
namespace android {
|
||||
namespace snapshot {
|
||||
|
||||
using android::base::unique_fd;
|
||||
|
||||
class SnapshotHandler;
|
||||
|
||||
class Worker {
|
||||
public:
|
||||
Worker(const std::string& cow_device, const std::string& misc_name,
|
||||
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
|
||||
virtual ~Worker() = default;
|
||||
|
||||
virtual bool Init();
|
||||
|
||||
protected:
|
||||
// Initialization
|
||||
void InitializeBufsink();
|
||||
bool InitializeFds();
|
||||
bool InitReader();
|
||||
virtual void CloseFds() { base_path_merge_fd_ = {}; }
|
||||
|
||||
std::unique_ptr<CowReader> reader_;
|
||||
BufferSink bufsink_;
|
||||
|
||||
std::string misc_name_; // Needed for SNAP_LOG.
|
||||
|
||||
unique_fd base_path_merge_fd_;
|
||||
|
||||
std::shared_ptr<SnapshotHandler> snapuserd_;
|
||||
|
||||
private:
|
||||
std::string cow_device_;
|
||||
std::string base_path_merge_;
|
||||
unique_fd cow_fd_;
|
||||
};
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace android
|
Loading…
Reference in a new issue