Merge changes I1625d1a6,I2db9cfa2,I59c31318,Ic0ed1a8d,I612374bb into main am: 83ebc4376d am: b19326fbad

Original change: https://android-review.googlesource.com/c/platform/system/core/+/2637513

Change-Id: Iabdcd63a1e316ed1035a28429564e8d7d6c9877b
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
This commit is contained in:
David Anderson 2023-07-17 20:12:55 +00:00 committed by Automerger Merge Worker
commit dff3daad7a
10 changed files with 369 additions and 211 deletions

View file

@ -64,12 +64,13 @@ cc_library_static {
"dm-snapshot-merge/snapuserd_readahead.cpp",
"snapuserd_buffer.cpp",
"user-space-merge/handler_manager.cpp",
"user-space-merge/read_worker.cpp",
"user-space-merge/snapuserd_core.cpp",
"user-space-merge/snapuserd_dm_user.cpp",
"user-space-merge/snapuserd_merge.cpp",
"user-space-merge/snapuserd_readahead.cpp",
"user-space-merge/snapuserd_transitions.cpp",
"user-space-merge/snapuserd_verify.cpp",
"user-space-merge/worker.cpp",
],
static_libs: [
"libbase",

View file

@ -18,7 +18,9 @@
#include <android-base/logging.h>
#include "read_worker.h"
#include "snapuserd_core.h"
#include "snapuserd_merge.h"
namespace android {
namespace snapshot {

View file

@ -14,6 +14,8 @@
* limitations under the License.
*/
#include "read_worker.h"
#include "snapuserd_core.h"
namespace android {
@ -23,59 +25,24 @@ using namespace android;
using namespace android::dm;
using android::base::unique_fd;
Worker::Worker(const std::string& cow_device, const std::string& backing_device,
const std::string& control_device, const std::string& misc_name,
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd) {
cow_device_ = cow_device;
backing_store_device_ = backing_device;
control_device_ = control_device;
misc_name_ = misc_name;
base_path_merge_ = base_path_merge;
snapuserd_ = snapuserd;
void ReadWorker::CloseFds() {
ctrl_fd_ = {};
backing_store_fd_ = {};
Worker::CloseFds();
}
bool Worker::InitializeFds() {
backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
if (backing_store_fd_ < 0) {
SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
return false;
}
cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
if (cow_fd_ < 0) {
SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
return false;
}
ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
if (ctrl_fd_ < 0) {
SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
return false;
}
// Base device used by merge thread
base_path_merge_fd_.reset(open(base_path_merge_.c_str(), O_RDWR));
if (base_path_merge_fd_ < 0) {
SNAP_PLOG(ERROR) << "Open Failed: " << base_path_merge_;
return false;
}
return true;
}
bool Worker::InitReader() {
reader_ = snapuserd_->CloneReaderForWorker();
if (!reader_->InitForMerge(std::move(cow_fd_))) {
return false;
}
return true;
}
ReadWorker::ReadWorker(const std::string& cow_device, const std::string& backing_device,
const std::string& control_device, const std::string& misc_name,
const std::string& base_path_merge,
std::shared_ptr<SnapshotHandler> snapuserd)
: Worker(cow_device, misc_name, base_path_merge, snapuserd),
backing_store_device_(backing_device),
control_device_(control_device) {}
// Start the replace operation. This will read the
// internal COW format and if the block is compressed,
// it will be de-compressed.
bool Worker::ProcessReplaceOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op) {
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (!buffer) {
SNAP_LOG(ERROR) << "ProcessReplaceOp failed to allocate buffer";
@ -88,7 +55,7 @@ bool Worker::ProcessReplaceOp(const CowOperation* cow_op) {
return true;
}
bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) {
bool ReadWorker::ReadFromSourceDevice(const CowOperation* cow_op) {
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (buffer == nullptr) {
SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
@ -118,7 +85,7 @@ bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) {
// Start the copy operation. This will read the backing
// block device which is represented by cow_op->source.
bool Worker::ProcessCopyOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessCopyOp(const CowOperation* cow_op) {
if (!ReadFromSourceDevice(cow_op)) {
return false;
}
@ -126,7 +93,7 @@ bool Worker::ProcessCopyOp(const CowOperation* cow_op) {
return true;
}
bool Worker::ProcessXorOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessXorOp(const CowOperation* cow_op) {
if (!ReadFromSourceDevice(cow_op)) {
return false;
}
@ -153,7 +120,7 @@ bool Worker::ProcessXorOp(const CowOperation* cow_op) {
return true;
}
bool Worker::ProcessZeroOp() {
bool ReadWorker::ProcessZeroOp() {
// Zero out the entire block
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (buffer == nullptr) {
@ -165,7 +132,7 @@ bool Worker::ProcessZeroOp() {
return true;
}
bool Worker::ProcessOrderedOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (buffer == nullptr) {
SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer";
@ -218,7 +185,7 @@ bool Worker::ProcessOrderedOp(const CowOperation* cow_op) {
return false;
}
bool Worker::ProcessCowOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessCowOp(const CowOperation* cow_op) {
if (cow_op == nullptr) {
SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
return false;
@ -246,31 +213,28 @@ bool Worker::ProcessCowOp(const CowOperation* cow_op) {
return false;
}
void Worker::InitializeBufsink() {
// Allocate the buffer which is used to communicate between
// daemon and dm-user. The buffer comprises of header and a fixed payload.
// If the dm-user requests a big IO, the IO will be broken into chunks
// of PAYLOAD_BUFFER_SZ.
size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_BUFFER_SZ;
bufsink_.Initialize(buf_size);
}
bool ReadWorker::Init() {
if (!Worker::Init()) {
return false;
}
backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
if (backing_store_fd_ < 0) {
SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
return false;
}
ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
if (ctrl_fd_ < 0) {
SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
return false;
}
bool Worker::Init() {
InitializeBufsink();
xorsink_.Initialize(&bufsink_, BLOCK_SZ);
if (!InitializeFds()) {
return false;
}
if (!InitReader()) {
return false;
}
return true;
}
bool Worker::RunThread() {
bool ReadWorker::Run() {
SNAP_LOG(INFO) << "Processing snapshot I/O requests....";
if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
@ -291,7 +255,7 @@ bool Worker::RunThread() {
}
// Send the payload/data back to dm-user misc device.
bool Worker::WriteDmUserPayload(size_t size) {
bool ReadWorker::WriteDmUserPayload(size_t size) {
size_t payload_size = size;
void* buf = bufsink_.GetPayloadBufPtr();
if (header_response_) {
@ -310,7 +274,7 @@ bool Worker::WriteDmUserPayload(size_t size) {
return true;
}
bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
bool ReadWorker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
CHECK(read_size <= BLOCK_SZ);
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
@ -329,7 +293,7 @@ bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
return true;
}
bool Worker::ReadAlignedSector(sector_t sector, size_t sz) {
bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
size_t remaining_size = sz;
std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
int ret = 0;
@ -389,7 +353,7 @@ bool Worker::ReadAlignedSector(sector_t sector, size_t sz) {
return true;
}
int Worker::ReadUnalignedSector(
int ReadWorker::ReadUnalignedSector(
sector_t sector, size_t size,
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
size_t skip_sector_size = 0;
@ -424,7 +388,7 @@ int Worker::ReadUnalignedSector(
return std::min(size, (BLOCK_SZ - skip_sector_size));
}
bool Worker::ReadUnalignedSector(sector_t sector, size_t size) {
bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
bufsink_.ResetBufferOffset();
std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
@ -563,7 +527,7 @@ bool Worker::ReadUnalignedSector(sector_t sector, size_t size) {
return true;
}
void Worker::RespondIOError() {
void ReadWorker::RespondIOError() {
struct dm_user_header* header = bufsink_.GetHeaderPtr();
header->type = DM_USER_RESP_ERROR;
// This is an issue with the dm-user interface. There
@ -580,7 +544,7 @@ void Worker::RespondIOError() {
WriteDmUserPayload(0);
}
bool Worker::DmuserReadRequest() {
bool ReadWorker::DmuserReadRequest() {
struct dm_user_header* header = bufsink_.GetHeaderPtr();
// Unaligned I/O request
@ -591,7 +555,7 @@ bool Worker::DmuserReadRequest() {
return ReadAlignedSector(header->sector, header->len);
}
bool Worker::ProcessIORequest() {
bool ReadWorker::ProcessIORequest() {
// Read Header from dm-user misc device. This gives
// us the sector number for which IO is issued by dm-snapshot device
struct dm_user_header* header = bufsink_.GetHeaderPtr();

View file

@ -0,0 +1,70 @@
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <utility>
#include <vector>
#include "worker.h"
namespace android {
namespace snapshot {
class ReadWorker : public Worker {
public:
ReadWorker(const std::string& cow_device, const std::string& backing_device,
const std::string& control_device, const std::string& misc_name,
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
bool Run();
bool Init() override;
void CloseFds() override;
private:
// Functions interacting with dm-user
bool ProcessIORequest();
bool WriteDmUserPayload(size_t size);
bool DmuserReadRequest();
void RespondIOError();
bool ProcessCowOp(const CowOperation* cow_op);
bool ProcessXorOp(const CowOperation* cow_op);
bool ProcessOrderedOp(const CowOperation* cow_op);
bool ProcessCopyOp(const CowOperation* cow_op);
bool ProcessReplaceOp(const CowOperation* cow_op);
bool ProcessZeroOp();
bool ReadAlignedSector(sector_t sector, size_t sz);
bool ReadUnalignedSector(sector_t sector, size_t size);
int ReadUnalignedSector(sector_t sector, size_t size,
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
bool ReadFromSourceDevice(const CowOperation* cow_op);
bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
constexpr bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
constexpr sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
std::string backing_store_device_;
unique_fd backing_store_fd_;
std::string control_device_;
unique_fd ctrl_fd_;
XorSink xorsink_;
bool header_response_ = false;
};
} // namespace snapshot
} // namespace android

View file

@ -23,6 +23,9 @@
#include <android-base/scopeguard.h>
#include <android-base/strings.h>
#include "read_worker.h"
#include "snapuserd_merge.h"
namespace android {
namespace snapshot {
@ -46,9 +49,8 @@ SnapshotHandler::SnapshotHandler(std::string misc_name, std::string cow_device,
bool SnapshotHandler::InitializeWorkers() {
for (int i = 0; i < num_worker_threads_; i++) {
std::unique_ptr<Worker> wt =
std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
misc_name_, base_path_merge_, GetSharedPtr());
auto wt = std::make_unique<ReadWorker>(cow_device_, backing_store_device_, control_device_,
misc_name_, base_path_merge_, GetSharedPtr());
if (!wt->Init()) {
SNAP_LOG(ERROR) << "Thread initialization failed";
return false;
@ -57,8 +59,8 @@ bool SnapshotHandler::InitializeWorkers() {
worker_threads_.push_back(std::move(wt));
}
merge_thread_ = std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
misc_name_, base_path_merge_, GetSharedPtr());
merge_thread_ = std::make_unique<MergeWorker>(cow_device_, misc_name_, base_path_merge_,
GetSharedPtr());
read_ahead_thread_ = std::make_unique<ReadAhead>(cow_device_, backing_store_device_, misc_name_,
GetSharedPtr());
@ -312,11 +314,11 @@ bool SnapshotHandler::Start() {
// Launch worker threads
for (int i = 0; i < worker_threads_.size(); i++) {
threads.emplace_back(
std::async(std::launch::async, &Worker::RunThread, worker_threads_[i].get()));
std::async(std::launch::async, &ReadWorker::Run, worker_threads_[i].get()));
}
std::future<bool> merge_thread =
std::async(std::launch::async, &Worker::RunMergeThread, merge_thread_.get());
std::async(std::launch::async, &MergeWorker::Run, merge_thread_.get());
// Now that the worker threads are up, scan the partitions.
if (perform_verification_) {
@ -452,5 +454,11 @@ bool SnapshotHandler::CheckPartitionVerification() {
return update_verify_->CheckPartitionVerification();
}
void SnapshotHandler::FreeResources() {
worker_threads_.clear();
read_ahead_thread_ = nullptr;
merge_thread_ = nullptr;
}
} // namespace snapshot
} // namespace android

View file

@ -75,7 +75,8 @@ enum class MERGE_IO_TRANSITION {
READ_AHEAD_FAILURE,
};
class SnapshotHandler;
class MergeWorker;
class ReadWorker;
enum class MERGE_GROUP_STATE {
GROUP_MERGE_PENDING,
@ -98,102 +99,6 @@ struct MergeGroupState {
: merge_state_(state), num_ios_in_progress(n_ios) {}
};
class Worker {
public:
Worker(const std::string& cow_device, const std::string& backing_device,
const std::string& control_device, const std::string& misc_name,
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
bool RunThread();
bool RunMergeThread();
bool Init();
private:
// Initialization
void InitializeBufsink();
bool InitializeFds();
bool InitReader();
void CloseFds() {
ctrl_fd_ = {};
backing_store_fd_ = {};
base_path_merge_fd_ = {};
}
// Functions interacting with dm-user
bool WriteDmUserPayload(size_t size);
bool DmuserReadRequest();
// IO Path
bool ProcessIORequest();
bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
bool ReadFromSourceDevice(const CowOperation* cow_op);
bool ReadAlignedSector(sector_t sector, size_t sz);
bool ReadUnalignedSector(sector_t sector, size_t size);
int ReadUnalignedSector(sector_t sector, size_t size,
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
void RespondIOError();
// Processing COW operations
bool ProcessCowOp(const CowOperation* cow_op);
bool ProcessReplaceOp(const CowOperation* cow_op);
bool ProcessZeroOp();
// Handles Copy and Xor
bool ProcessCopyOp(const CowOperation* cow_op);
bool ProcessXorOp(const CowOperation* cow_op);
bool ProcessOrderedOp(const CowOperation* cow_op);
// Merge related ops
bool Merge();
bool AsyncMerge();
bool SyncMerge();
bool MergeOrderedOps();
bool MergeOrderedOpsAsync();
bool MergeReplaceZeroOps();
int PrepareMerge(uint64_t* source_offset, int* pending_ops,
std::vector<const CowOperation*>* replace_zero_vec = nullptr);
sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
bool InitializeIouring();
void FinalizeIouring();
std::unique_ptr<CowReader> reader_;
BufferSink bufsink_;
XorSink xorsink_;
std::string cow_device_;
std::string backing_store_device_;
std::string control_device_;
std::string misc_name_;
std::string base_path_merge_;
unique_fd cow_fd_;
unique_fd backing_store_fd_;
unique_fd base_path_merge_fd_;
unique_fd ctrl_fd_;
bool header_response_ = false;
std::unique_ptr<ICowOpIter> cowop_iter_;
size_t ra_block_index_ = 0;
uint64_t blocks_merged_in_group_ = 0;
bool merge_async_ = false;
// Queue depth of 8 seems optimal. We don't want
// to have a huge depth as it may put more memory pressure
// on the kernel worker threads given that we use
// IOSQE_ASYNC flag - ASYNC flags can potentially
// result in EINTR; Since we don't restart
// syscalls and fallback to synchronous I/O, we
// don't want huge queue depth
int queue_depth_ = 8;
std::unique_ptr<struct io_uring> ring_;
std::shared_ptr<SnapshotHandler> snapuserd_;
};
class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
public:
SnapshotHandler(std::string misc_name, std::string cow_device, std::string backing_device,
@ -212,11 +117,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
bool CommitMerge(int num_merge_ops);
void CloseFds() { cow_fd_ = {}; }
void FreeResources() {
worker_threads_.clear();
read_ahead_thread_ = nullptr;
merge_thread_ = nullptr;
}
void FreeResources();
bool InitializeWorkers();
std::unique_ptr<CowReader> CloneReaderForWorker();
@ -315,7 +216,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
void* mapped_addr_;
size_t total_mapped_addr_length_;
std::vector<std::unique_ptr<Worker>> worker_threads_;
std::vector<std::unique_ptr<ReadWorker>> worker_threads_;
// Read-ahead related
bool populate_data_from_cow_ = false;
bool ra_thread_ = false;
@ -330,7 +231,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
// Merge Block state
std::vector<std::unique_ptr<MergeGroupState>> merge_blk_state_;
std::unique_ptr<Worker> merge_thread_;
std::unique_ptr<MergeWorker> merge_thread_;
double merge_completion_percentage_;
bool merge_initiated_ = false;

View file

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "snapuserd_merge.h"
#include "snapuserd_core.h"
@ -23,8 +24,13 @@ using namespace android;
using namespace android::dm;
using android::base::unique_fd;
int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
std::vector<const CowOperation*>* replace_zero_vec) {
MergeWorker::MergeWorker(const std::string& cow_device, const std::string& misc_name,
const std::string& base_path_merge,
std::shared_ptr<SnapshotHandler> snapuserd)
: Worker(cow_device, misc_name, base_path_merge, snapuserd) {}
int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
std::vector<const CowOperation*>* replace_zero_vec) {
int num_ops = *pending_ops;
int nr_consecutive = 0;
bool checkOrderedOp = (replace_zero_vec == nullptr);
@ -70,7 +76,7 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
return nr_consecutive;
}
bool Worker::MergeReplaceZeroOps() {
bool MergeWorker::MergeReplaceZeroOps() {
// Flush after merging 2MB. Since all ops are independent and there is no
// dependency between COW ops, we will flush the data and the number
// of ops merged in COW block device. If there is a crash, we will
@ -99,17 +105,20 @@ bool Worker::MergeReplaceZeroOps() {
for (size_t i = 0; i < replace_zero_vec.size(); i++) {
const CowOperation* cow_op = replace_zero_vec[i];
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (!buffer) {
SNAP_LOG(ERROR) << "Failed to acquire buffer in merge";
return false;
}
if (cow_op->type == kCowReplaceOp) {
if (!ProcessReplaceOp(cow_op)) {
SNAP_LOG(ERROR) << "Merge - ReplaceOp failed for block: " << cow_op->new_block;
if (!reader_->ReadData(cow_op, buffer, BLOCK_SZ)) {
SNAP_LOG(ERROR) << "Failed to read COW in merge";
return false;
}
} else {
CHECK(cow_op->type == kCowZeroOp);
if (!ProcessZeroOp()) {
SNAP_LOG(ERROR) << "Merge ZeroOp failed.";
return false;
}
memset(buffer, 0, BLOCK_SZ);
}
bufsink_.UpdateBufferOffset(BLOCK_SZ);
@ -149,7 +158,7 @@ bool Worker::MergeReplaceZeroOps() {
if (snapuserd_->IsIOTerminated()) {
SNAP_LOG(ERROR)
<< "MergeReplaceZeroOps: Worker threads terminated - shutting down merge";
<< "MergeReplaceZeroOps: MergeWorker threads terminated - shutting down merge";
return false;
}
}
@ -173,7 +182,7 @@ bool Worker::MergeReplaceZeroOps() {
return true;
}
bool Worker::MergeOrderedOpsAsync() {
bool MergeWorker::MergeOrderedOpsAsync() {
void* mapped_addr = snapuserd_->GetMappedAddr();
void* read_ahead_buffer =
static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
@ -354,7 +363,7 @@ bool Worker::MergeOrderedOpsAsync() {
return true;
}
bool Worker::MergeOrderedOps() {
bool MergeWorker::MergeOrderedOps() {
void* mapped_addr = snapuserd_->GetMappedAddr();
void* read_ahead_buffer =
static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
@ -439,7 +448,7 @@ bool Worker::MergeOrderedOps() {
return true;
}
bool Worker::AsyncMerge() {
bool MergeWorker::AsyncMerge() {
if (!MergeOrderedOpsAsync()) {
SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O";
// Reset the iter so that we retry the merge
@ -455,7 +464,7 @@ bool Worker::AsyncMerge() {
return true;
}
bool Worker::SyncMerge() {
bool MergeWorker::SyncMerge() {
if (!MergeOrderedOps()) {
SNAP_LOG(ERROR) << "Merge failed for ordered ops";
return false;
@ -465,7 +474,7 @@ bool Worker::SyncMerge() {
return true;
}
bool Worker::Merge() {
bool MergeWorker::Merge() {
cowop_iter_ = reader_->GetOpIter(true);
bool retry = false;
@ -511,7 +520,7 @@ bool Worker::Merge() {
return true;
}
bool Worker::InitializeIouring() {
bool MergeWorker::InitializeIouring() {
if (!snapuserd_->IsIouringSupported()) {
return false;
}
@ -530,13 +539,13 @@ bool Worker::InitializeIouring() {
return true;
}
void Worker::FinalizeIouring() {
void MergeWorker::FinalizeIouring() {
if (merge_async_) {
io_uring_queue_exit(ring_.get());
}
}
bool Worker::RunMergeThread() {
bool MergeWorker::Run() {
SNAP_LOG(DEBUG) << "Waiting for merge begin...";
if (!snapuserd_->WaitForMergeBegin()) {
SNAP_LOG(ERROR) << "Merge terminated early...";

View file

@ -0,0 +1,58 @@
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "worker.h"
#include <liburing.h>
namespace android {
namespace snapshot {
class MergeWorker : public Worker {
public:
MergeWorker(const std::string& cow_device, const std::string& misc_name,
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
bool Run();
private:
int PrepareMerge(uint64_t* source_offset, int* pending_ops,
std::vector<const CowOperation*>* replace_zero_vec = nullptr);
bool MergeReplaceZeroOps();
bool MergeOrderedOps();
bool MergeOrderedOpsAsync();
bool Merge();
bool AsyncMerge();
bool SyncMerge();
bool InitializeIouring();
void FinalizeIouring();
private:
std::unique_ptr<ICowOpIter> cowop_iter_;
std::unique_ptr<struct io_uring> ring_;
size_t ra_block_index_ = 0;
uint64_t blocks_merged_in_group_ = 0;
bool merge_async_ = false;
// Queue depth of 8 seems optimal. We don't want
// to have a huge depth as it may put more memory pressure
// on the kernel worker threads given that we use
// IOSQE_ASYNC flag - ASYNC flags can potentially
// result in EINTR; Since we don't restart
// syscalls and fallback to synchronous I/O, we
// don't want huge queue depth
int queue_depth_ = 8;
};
} // namespace snapshot
} // namespace android

View file

@ -0,0 +1,80 @@
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "worker.h"
#include "snapuserd_core.h"
namespace android {
namespace snapshot {
Worker::Worker(const std::string& cow_device, const std::string& misc_name,
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd) {
cow_device_ = cow_device;
misc_name_ = misc_name;
base_path_merge_ = base_path_merge;
snapuserd_ = snapuserd;
}
void Worker::InitializeBufsink() {
// Allocate the buffer which is used to communicate between
// daemon and dm-user. The buffer comprises of header and a fixed payload.
// If the dm-user requests a big IO, the IO will be broken into chunks
// of PAYLOAD_BUFFER_SZ.
size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_BUFFER_SZ;
bufsink_.Initialize(buf_size);
}
bool Worker::Init() {
InitializeBufsink();
if (!InitializeFds()) {
return false;
}
if (!InitReader()) {
return false;
}
return true;
}
bool Worker::InitReader() {
reader_ = snapuserd_->CloneReaderForWorker();
if (!reader_->InitForMerge(std::move(cow_fd_))) {
return false;
}
return true;
}
bool Worker::InitializeFds() {
cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
if (cow_fd_ < 0) {
SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
return false;
}
// Base device used by merge thread
base_path_merge_fd_.reset(open(base_path_merge_.c_str(), O_RDWR));
if (base_path_merge_fd_ < 0) {
SNAP_PLOG(ERROR) << "Open Failed: " << base_path_merge_;
return false;
}
return true;
}
} // namespace snapshot
} // namespace android

View file

@ -0,0 +1,65 @@
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <stddef.h>
#include <memory>
#include <string>
#include <android-base/unique_fd.h>
#include <libsnapshot/cow_reader.h>
#include <snapuserd/snapuserd_buffer.h>
#include <snapuserd/snapuserd_kernel.h>
namespace android {
namespace snapshot {
using android::base::unique_fd;
class SnapshotHandler;
class Worker {
public:
Worker(const std::string& cow_device, const std::string& misc_name,
const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
virtual ~Worker() = default;
virtual bool Init();
protected:
// Initialization
void InitializeBufsink();
bool InitializeFds();
bool InitReader();
virtual void CloseFds() { base_path_merge_fd_ = {}; }
std::unique_ptr<CowReader> reader_;
BufferSink bufsink_;
std::string misc_name_; // Needed for SNAP_LOG.
unique_fd base_path_merge_fd_;
std::shared_ptr<SnapshotHandler> snapuserd_;
private:
std::string cow_device_;
std::string base_path_merge_;
unique_fd cow_fd_;
};
} // namespace snapshot
} // namespace android