diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp index e73684738..be6b6daa6 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp @@ -34,6 +34,7 @@ #include #include +#include #include #include #include @@ -55,11 +56,35 @@ static_assert(sizeof(off_t) == sizeof(uint64_t)); using android::base::unique_fd; +// Divide |x| by |y| and round up to the nearest integer. +constexpr uint64_t DivRoundUp(uint64_t x, uint64_t y) { + return (x + y - 1) / y; +} + CowWriterV3::CowWriterV3(const CowOptions& options, unique_fd&& fd) : CowWriterBase(options, std::move(fd)), batch_size_(std::max(options.cluster_ops, 1)) { SetupHeaders(); } +void CowWriterV3::InitWorkers() { + if (num_compress_threads_ <= 1) { + LOG_INFO << "Not creating new threads for compression."; + return; + } + compress_threads_.reserve(num_compress_threads_); + compress_threads_.clear(); + threads_.reserve(num_compress_threads_); + threads_.clear(); + for (size_t i = 0; i < num_compress_threads_; i++) { + std::unique_ptr compressor = + ICompressor::Create(compression_, header_.block_size); + auto&& wt = compress_threads_.emplace_back( + std::make_unique(std::move(compressor), header_.block_size)); + threads_.emplace_back(std::thread([wt = wt.get()]() { wt->RunThread(); })); + } + LOG(INFO) << num_compress_threads_ << " thread used for compression"; +} + void CowWriterV3::SetupHeaders() { header_ = {}; header_.prefix.magic = kCowMagicNumber; @@ -135,10 +160,24 @@ bool CowWriterV3::ParseOptions() { } else { LOG(INFO) << "Batch writes: disabled"; } + if (android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false) && + options_.num_compress_threads) { + num_compress_threads_ = options_.num_compress_threads; + } + InitWorkers(); return true; } -CowWriterV3::~CowWriterV3() {} +CowWriterV3::~CowWriterV3() { + for (const auto& t : compress_threads_) { + t->Finalize(); + } + for (auto& t : threads_) { + if (t.joinable()) { + t.join(); + } + } +} bool CowWriterV3::Initialize(std::optional label) { if (!InitFd() || !ParseOptions()) { @@ -289,19 +328,24 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t << " but compressor is uninitialized."; return false; } + const auto bytes = reinterpret_cast(data); const size_t num_blocks = (size / header_.block_size); for (size_t i = 0; i < num_blocks;) { const auto blocks_to_write = std::min(batch_size_ - cached_data_.size(), num_blocks - i); size_t compressed_bytes = 0; + auto&& blocks = CompressBlocks(blocks_to_write, bytes + header_.block_size * i); + if (blocks.size() != blocks_to_write) { + LOG(ERROR) << "Failed to compress blocks " << new_block_start + i << ", " + << blocks_to_write << ", actual number of blocks received from compressor " + << blocks.size(); + return false; + } for (size_t j = 0; j < blocks_to_write; j++) { - const uint8_t* const iter = - reinterpret_cast(data) + (header_.block_size * (i + j)); - CowOperation& op = cached_ops_.emplace_back(); auto& vec = data_vec_.emplace_back(); - auto& compressed_data = cached_data_.emplace_back(); + auto& compressed_data = cached_data_.emplace_back(std::move(blocks[j])); op.new_block = new_block_start + i + j; op.set_type(type); @@ -310,20 +354,6 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t } else { op.set_source(next_data_pos_ + compressed_bytes); } - if (compression_.algorithm == kCowCompressNone) { - compressed_data.resize(header_.block_size); - } else { - compressed_data = compressor_->Compress(iter, header_.block_size); - if (compressed_data.empty()) { - LOG(ERROR) << "Compression failed during EmitBlocks(" << new_block_start << ", " - << num_blocks << ");"; - return false; - } - } - if (compressed_data.size() >= header_.block_size) { - compressed_data.resize(header_.block_size); - std::memcpy(compressed_data.data(), iter, header_.block_size); - } vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()}; op.data_length = vec.iov_len; compressed_bytes += op.data_length; @@ -443,6 +473,57 @@ bool CowWriterV3::FlushCacheOps() { return true; } +std::vector> CowWriterV3::CompressBlocks(const size_t num_blocks, + const void* data) { + const size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_; + const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads); + std::vector> compressed_buf; + compressed_buf.clear(); + const uint8_t* const iter = reinterpret_cast(data); + if (compression_.algorithm == kCowCompressNone) { + for (size_t i = 0; i < num_blocks; i++) { + auto& buf = compressed_buf.emplace_back(); + buf.resize(header_.block_size); + std::memcpy(buf.data(), iter + i * header_.block_size, header_.block_size); + } + return compressed_buf; + } + if (num_threads <= 1) { + if (!CompressWorker::CompressBlocks(compressor_.get(), header_.block_size, data, num_blocks, + &compressed_buf)) { + return {}; + } + } else { + // Submit the blocks per thread. The retrieval of + // compressed buffers has to be done in the same order. + // We should not poll for completed buffers in a different order as the + // buffers are tightly coupled with block ordering. + for (size_t i = 0; i < num_threads; i++) { + CompressWorker* worker = compress_threads_[i].get(); + const auto blocks_in_batch = + std::min(num_blocks - i * blocks_per_thread, blocks_per_thread); + worker->EnqueueCompressBlocks(iter + i * blocks_per_thread * header_.block_size, + blocks_in_batch); + } + + for (size_t i = 0; i < num_threads; i++) { + CompressWorker* worker = compress_threads_[i].get(); + if (!worker->GetCompressedBuffers(&compressed_buf)) { + return {}; + } + } + } + for (size_t i = 0; i < num_blocks; i++) { + auto& block = compressed_buf[i]; + if (block.size() >= header_.block_size) { + block.resize(header_.block_size); + std::memcpy(block.data(), iter + header_.block_size * i, header_.block_size); + } + } + + return compressed_buf; +} + bool CowWriterV3::WriteOperation(std::basic_string_view ops, std::basic_string_view data) { const auto total_data_size = diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h index 73ac52011..b19af60a3 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h +++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h @@ -16,6 +16,7 @@ #include #include +#include #include #include "writer_base.h" @@ -51,12 +52,14 @@ class CowWriterV3 : public CowWriterBase { std::basic_string_view data); bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block, uint16_t offset, CowOperationType type); - bool CompressBlocks(size_t num_blocks, const void* data); bool CheckOpCount(size_t op_count); private: + std::vector> CompressBlocks(const size_t num_blocks, + const void* data); bool ReadBackVerification(); bool FlushCacheOps(); + void InitWorkers(); CowHeaderV3 header_{}; CowCompression compression_; // in the case that we are using one thread for compression, we can store and re-use the same @@ -75,6 +78,8 @@ class CowWriterV3 : public CowWriterBase { std::vector cached_ops_; std::vector> cached_data_; std::vector data_vec_; + + std::vector threads_; }; } // namespace snapshot