Merge "Support multi-threaded compression in COW v3" into main

This commit is contained in:
Kelvin Zhang 2024-01-05 22:06:40 +00:00 committed by Gerrit Code Review
commit a79a9e3cfe
2 changed files with 106 additions and 20 deletions

View file

@ -34,6 +34,7 @@
#include <zlib.h>
#include <fcntl.h>
#include <libsnapshot/cow_compress.h>
#include <libsnapshot_cow/parser_v3.h>
#include <linux/fs.h>
#include <sys/ioctl.h>
@ -55,11 +56,35 @@ static_assert(sizeof(off_t) == sizeof(uint64_t));
using android::base::unique_fd;
// Divide |x| by |y| and round up to the nearest integer.
constexpr uint64_t DivRoundUp(uint64_t x, uint64_t y) {
return (x + y - 1) / y;
}
CowWriterV3::CowWriterV3(const CowOptions& options, unique_fd&& fd)
: CowWriterBase(options, std::move(fd)), batch_size_(std::max<size_t>(options.cluster_ops, 1)) {
SetupHeaders();
}
void CowWriterV3::InitWorkers() {
if (num_compress_threads_ <= 1) {
LOG_INFO << "Not creating new threads for compression.";
return;
}
compress_threads_.reserve(num_compress_threads_);
compress_threads_.clear();
threads_.reserve(num_compress_threads_);
threads_.clear();
for (size_t i = 0; i < num_compress_threads_; i++) {
std::unique_ptr<ICompressor> compressor =
ICompressor::Create(compression_, header_.block_size);
auto&& wt = compress_threads_.emplace_back(
std::make_unique<CompressWorker>(std::move(compressor), header_.block_size));
threads_.emplace_back(std::thread([wt = wt.get()]() { wt->RunThread(); }));
}
LOG(INFO) << num_compress_threads_ << " thread used for compression";
}
void CowWriterV3::SetupHeaders() {
header_ = {};
header_.prefix.magic = kCowMagicNumber;
@ -135,10 +160,24 @@ bool CowWriterV3::ParseOptions() {
} else {
LOG(INFO) << "Batch writes: disabled";
}
if (android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false) &&
options_.num_compress_threads) {
num_compress_threads_ = options_.num_compress_threads;
}
InitWorkers();
return true;
}
CowWriterV3::~CowWriterV3() {}
CowWriterV3::~CowWriterV3() {
for (const auto& t : compress_threads_) {
t->Finalize();
}
for (auto& t : threads_) {
if (t.joinable()) {
t.join();
}
}
}
bool CowWriterV3::Initialize(std::optional<uint64_t> label) {
if (!InitFd() || !ParseOptions()) {
@ -289,19 +328,24 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t
<< " but compressor is uninitialized.";
return false;
}
const auto bytes = reinterpret_cast<const uint8_t*>(data);
const size_t num_blocks = (size / header_.block_size);
for (size_t i = 0; i < num_blocks;) {
const auto blocks_to_write =
std::min<size_t>(batch_size_ - cached_data_.size(), num_blocks - i);
size_t compressed_bytes = 0;
auto&& blocks = CompressBlocks(blocks_to_write, bytes + header_.block_size * i);
if (blocks.size() != blocks_to_write) {
LOG(ERROR) << "Failed to compress blocks " << new_block_start + i << ", "
<< blocks_to_write << ", actual number of blocks received from compressor "
<< blocks.size();
return false;
}
for (size_t j = 0; j < blocks_to_write; j++) {
const uint8_t* const iter =
reinterpret_cast<const uint8_t*>(data) + (header_.block_size * (i + j));
CowOperation& op = cached_ops_.emplace_back();
auto& vec = data_vec_.emplace_back();
auto& compressed_data = cached_data_.emplace_back();
auto& compressed_data = cached_data_.emplace_back(std::move(blocks[j]));
op.new_block = new_block_start + i + j;
op.set_type(type);
@ -310,20 +354,6 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t
} else {
op.set_source(next_data_pos_ + compressed_bytes);
}
if (compression_.algorithm == kCowCompressNone) {
compressed_data.resize(header_.block_size);
} else {
compressed_data = compressor_->Compress(iter, header_.block_size);
if (compressed_data.empty()) {
LOG(ERROR) << "Compression failed during EmitBlocks(" << new_block_start << ", "
<< num_blocks << ");";
return false;
}
}
if (compressed_data.size() >= header_.block_size) {
compressed_data.resize(header_.block_size);
std::memcpy(compressed_data.data(), iter, header_.block_size);
}
vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()};
op.data_length = vec.iov_len;
compressed_bytes += op.data_length;
@ -443,6 +473,57 @@ bool CowWriterV3::FlushCacheOps() {
return true;
}
std::vector<std::basic_string<uint8_t>> CowWriterV3::CompressBlocks(const size_t num_blocks,
const void* data) {
const size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads);
std::vector<std::basic_string<uint8_t>> compressed_buf;
compressed_buf.clear();
const uint8_t* const iter = reinterpret_cast<const uint8_t*>(data);
if (compression_.algorithm == kCowCompressNone) {
for (size_t i = 0; i < num_blocks; i++) {
auto& buf = compressed_buf.emplace_back();
buf.resize(header_.block_size);
std::memcpy(buf.data(), iter + i * header_.block_size, header_.block_size);
}
return compressed_buf;
}
if (num_threads <= 1) {
if (!CompressWorker::CompressBlocks(compressor_.get(), header_.block_size, data, num_blocks,
&compressed_buf)) {
return {};
}
} else {
// Submit the blocks per thread. The retrieval of
// compressed buffers has to be done in the same order.
// We should not poll for completed buffers in a different order as the
// buffers are tightly coupled with block ordering.
for (size_t i = 0; i < num_threads; i++) {
CompressWorker* worker = compress_threads_[i].get();
const auto blocks_in_batch =
std::min(num_blocks - i * blocks_per_thread, blocks_per_thread);
worker->EnqueueCompressBlocks(iter + i * blocks_per_thread * header_.block_size,
blocks_in_batch);
}
for (size_t i = 0; i < num_threads; i++) {
CompressWorker* worker = compress_threads_[i].get();
if (!worker->GetCompressedBuffers(&compressed_buf)) {
return {};
}
}
}
for (size_t i = 0; i < num_blocks; i++) {
auto& block = compressed_buf[i];
if (block.size() >= header_.block_size) {
block.resize(header_.block_size);
std::memcpy(block.data(), iter + header_.block_size * i, header_.block_size);
}
}
return compressed_buf;
}
bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops,
std::basic_string_view<struct iovec> data) {
const auto total_data_size =

View file

@ -16,6 +16,7 @@
#include <android-base/logging.h>
#include <string_view>
#include <thread>
#include <vector>
#include "writer_base.h"
@ -51,12 +52,14 @@ class CowWriterV3 : public CowWriterBase {
std::basic_string_view<struct iovec> data);
bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
uint16_t offset, CowOperationType type);
bool CompressBlocks(size_t num_blocks, const void* data);
bool CheckOpCount(size_t op_count);
private:
std::vector<std::basic_string<uint8_t>> CompressBlocks(const size_t num_blocks,
const void* data);
bool ReadBackVerification();
bool FlushCacheOps();
void InitWorkers();
CowHeaderV3 header_{};
CowCompression compression_;
// in the case that we are using one thread for compression, we can store and re-use the same
@ -75,6 +78,8 @@ class CowWriterV3 : public CowWriterBase {
std::vector<CowOperationV3> cached_ops_;
std::vector<std::basic_string<uint8_t>> cached_data_;
std::vector<struct iovec> data_vec_;
std::vector<std::thread> threads_;
};
} // namespace snapshot