diff --git a/fs_mgr/libsnapshot/Android.bp b/fs_mgr/libsnapshot/Android.bp index a8a7716aa..5ceaf287a 100644 --- a/fs_mgr/libsnapshot/Android.bp +++ b/fs_mgr/libsnapshot/Android.bp @@ -108,7 +108,7 @@ cc_library_static { ], srcs: [":libsnapshot_sources"], static_libs: [ - "libfs_mgr_binder" + "libfs_mgr_binder", ], } @@ -128,12 +128,12 @@ cc_library { static_libs: [ "libc++fs", "libsnapshot_cow", - ] + ], } cc_library_static { name: "libsnapshot_init", - native_coverage : true, + native_coverage: true, defaults: ["libsnapshot_defaults"], srcs: [":libsnapshot_sources"], ramdisk_available: true, @@ -204,6 +204,10 @@ cc_library_static { "libsnapshot_cow/writer_v2.cpp", "libsnapshot_cow/writer_v3.cpp", ], + + header_libs: [ + "libstorage_literals_headers", + ], export_include_dirs: ["include"], host_supported: true, recovery_available: true, @@ -243,7 +247,10 @@ cc_library_static { cc_defaults { name: "libsnapshot_test_defaults", - defaults: ["libsnapshot_defaults", "libsnapshot_cow_defaults"], + defaults: [ + "libsnapshot_defaults", + "libsnapshot_cow_defaults", + ], srcs: [ "partition_cow_creator_test.cpp", "snapshot_metadata_updater_test.cpp", @@ -283,10 +290,13 @@ cc_defaults { cc_test { name: "vts_libsnapshot_test", - defaults: ["libsnapshot_test_defaults", "libsnapshot_hal_deps"], + defaults: [ + "libsnapshot_test_defaults", + "libsnapshot_hal_deps", + ], test_suites: [ "vts", - "device-tests" + "device-tests", ], test_options: { min_shipping_api_level: 30, @@ -295,12 +305,15 @@ cc_test { cc_test { name: "vab_legacy_tests", - defaults: ["libsnapshot_test_defaults", "libsnapshot_hal_deps"], + defaults: [ + "libsnapshot_test_defaults", + "libsnapshot_hal_deps", + ], cppflags: [ "-DLIBSNAPSHOT_TEST_VAB_LEGACY", ], test_suites: [ - "device-tests" + "device-tests", ], test_options: { // Legacy VAB launched in Android R. @@ -310,12 +323,15 @@ cc_test { cc_test { name: "vabc_legacy_tests", - defaults: ["libsnapshot_test_defaults", "libsnapshot_hal_deps"], + defaults: [ + "libsnapshot_test_defaults", + "libsnapshot_hal_deps", + ], cppflags: [ "-DLIBSNAPSHOT_TEST_VABC_LEGACY", ], test_suites: [ - "device-tests" + "device-tests", ], test_options: { // Legacy VABC launched in Android S. @@ -343,7 +359,10 @@ cc_test { cc_binary { name: "snapshotctl", - defaults: ["libsnapshot_cow_defaults", "libsnapshot_hal_deps"], + defaults: [ + "libsnapshot_cow_defaults", + "libsnapshot_hal_deps", + ], srcs: [ "snapshotctl.cpp", ], @@ -412,8 +431,11 @@ cc_test { "libgtest", "libsnapshot_cow", ], + header_libs: [ + "libstorage_literals_headers", + ], test_suites: [ - "device-tests" + "device-tests", ], test_options: { min_shipping_api_level: 30, diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h index d410c14ee..6865b1944 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h @@ -201,6 +201,12 @@ static constexpr uint64_t kCowOpSourceInfoDataMask = (1ULL << 48) - 1; static constexpr uint64_t kCowOpSourceInfoTypeBit = 60; static constexpr uint64_t kCowOpSourceInfoTypeNumBits = 4; static constexpr uint64_t kCowOpSourceInfoTypeMask = (1ULL << kCowOpSourceInfoTypeNumBits) - 1; + +static constexpr uint64_t kCowOpSourceInfoCompressionBit = 57; +static constexpr uint64_t kCowOpSourceInfoCompressionNumBits = 3; +static constexpr uint64_t kCowOpSourceInfoCompressionMask = + ((1ULL << kCowOpSourceInfoCompressionNumBits) - 1); + // The on disk format of cow (currently == CowOperation) struct CowOperationV3 { // If this operation reads from the data section of the COW, this contains @@ -211,8 +217,8 @@ struct CowOperationV3 { uint32_t new_block; // source_info with have the following layout - // |---4 bits ---| ---12 bits---| --- 48 bits ---| - // |--- type --- | -- unused -- | --- source --- | + // |--- 4 bits -- | --------- 3 bits ------ | --- 9 bits --- | --- 48 bits ---| + // |--- type --- | -- compression factor --| --- unused --- | --- source --- | // // The value of |source| depends on the operation code. // @@ -225,6 +231,17 @@ struct CowOperationV3 { // For ops other than Label: // Bits 47-62 are reserved and must be zero. // A block is compressed if it’s data is < block_sz + // + // Bits [57-59] represents the compression factor. + // + // Compression - factor + // ========================== + // 000 - 4k + // 001 - 8k + // 010 - 16k + // ... + // 110 - 256k + // uint64_t source_info_; constexpr uint64_t source() const { return source_info_ & kCowOpSourceInfoDataMask; } constexpr void set_source(uint64_t source) { @@ -245,6 +262,20 @@ struct CowOperationV3 { source_info_ |= (static_cast(type) & kCowOpSourceInfoTypeMask) << kCowOpSourceInfoTypeBit; } + constexpr void set_compression_bits(uint8_t compression_factor) { + // Clear the 3 bits from bit 57 - [57-59] + source_info_ &= ~(kCowOpSourceInfoCompressionMask << kCowOpSourceInfoCompressionBit); + // Set the actual compression factor + source_info_ |= + (static_cast(compression_factor) & kCowOpSourceInfoCompressionMask) + << kCowOpSourceInfoCompressionBit; + } + constexpr uint8_t compression_bits() const { + // Grab the 3 bits from [57-59] + const auto compression_factor = + (source_info_ >> kCowOpSourceInfoCompressionBit) & kCowOpSourceInfoCompressionMask; + return static_cast(compression_factor); + } } __attribute__((packed)); // Ensure that getters/setters added to CowOperationV3 does not increases size @@ -326,5 +357,11 @@ bool IsOrderedOp(const CowOperation& op); // Convert compression name to internal value. std::optional CompressionAlgorithmFromString(std::string_view name); +// Return block size used for compression +size_t CowOpCompressionSize(const CowOperation* op, size_t block_size); + +// Return the relative offset of the I/O block which the CowOperation +// multi-block compression +bool GetBlockOffset(const CowOperation* op, uint64_t io_block, size_t block_size, off_t* offset); } // namespace snapshot } // namespace android diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h index bf4c79f17..3f49c69a8 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h @@ -162,6 +162,9 @@ class CowReader final : public ICowReader { // Creates a clone of the current CowReader without the file handlers std::unique_ptr CloneCowReader(); + // Get the max compression size + uint32_t GetMaxCompressionSize(); + void UpdateMergeOpsCompleted(int num_merge_ops) { header_.num_merge_ops += num_merge_ops; } private: diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h index 0194ffd7b..89699dcee 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h @@ -119,9 +119,9 @@ class ICowWriter { class CompressWorker { public: - CompressWorker(std::unique_ptr&& compressor, uint32_t block_size); + CompressWorker(std::unique_ptr&& compressor); bool RunThread(); - void EnqueueCompressBlocks(const void* buffer, size_t num_blocks); + void EnqueueCompressBlocks(const void* buffer, size_t block_size, size_t num_blocks); bool GetCompressedBuffers(std::vector>* compressed_buf); void Finalize(); static uint32_t GetDefaultCompressionLevel(CowCompressionAlgorithm compression); @@ -134,20 +134,22 @@ class CompressWorker { struct CompressWork { const void* buffer; size_t num_blocks; + size_t block_size; bool compression_status = false; std::vector> compressed_data; }; std::unique_ptr compressor_; - uint32_t block_size_; std::queue work_queue_; std::queue compressed_queue_; std::mutex lock_; std::condition_variable cv_; bool stopped_ = false; + size_t total_submitted_ = 0; + size_t total_processed_ = 0; - bool CompressBlocks(const void* buffer, size_t num_blocks, + bool CompressBlocks(const void* buffer, size_t num_blocks, size_t block_size, std::vector>* compressed_data); }; diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp index abc7d336f..577cdbd96 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp @@ -208,9 +208,9 @@ class ZstdCompressor final : public ICompressor { std::unique_ptr zstd_context_; }; -bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks, +bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks, size_t block_size, std::vector>* compressed_data) { - return CompressBlocks(compressor_.get(), block_size_, buffer, num_blocks, compressed_data); + return CompressBlocks(compressor_.get(), block_size, buffer, num_blocks, compressed_data); } bool CompressWorker::CompressBlocks(ICompressor* compressor, size_t block_size, const void* buffer, @@ -223,7 +223,7 @@ bool CompressWorker::CompressBlocks(ICompressor* compressor, size_t block_size, PLOG(ERROR) << "CompressBlocks: Compression failed"; return false; } - if (data.size() > std::numeric_limits::max()) { + if (data.size() > std::numeric_limits::max()) { LOG(ERROR) << "Compressed block is too large: " << data.size(); return false; } @@ -254,7 +254,8 @@ bool CompressWorker::RunThread() { } // Compress blocks - bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, &blocks.compressed_data); + bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, blocks.block_size, + &blocks.compressed_data); blocks.compression_status = ret; { std::lock_guard lock(lock_); @@ -273,35 +274,31 @@ bool CompressWorker::RunThread() { return true; } -void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t num_blocks) { +void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t block_size, + size_t num_blocks) { { std::lock_guard lock(lock_); CompressWork blocks = {}; blocks.buffer = buffer; + blocks.block_size = block_size; blocks.num_blocks = num_blocks; work_queue_.push(std::move(blocks)); + total_submitted_ += 1; } cv_.notify_all(); } bool CompressWorker::GetCompressedBuffers(std::vector>* compressed_buf) { - { + while (true) { std::unique_lock lock(lock_); - while (compressed_queue_.empty() && !stopped_) { + while ((total_submitted_ != total_processed_) && compressed_queue_.empty() && !stopped_) { cv_.wait(lock); } - - if (stopped_) { - return true; - } - } - - { - std::lock_guard lock(lock_); while (compressed_queue_.size() > 0) { CompressWork blocks = std::move(compressed_queue_.front()); compressed_queue_.pop(); + total_processed_ += 1; if (blocks.compression_status) { compressed_buf->insert(compressed_buf->end(), @@ -312,9 +309,12 @@ bool CompressWorker::GetCompressedBuffers(std::vector return false; } } + if ((total_submitted_ == total_processed_) || stopped_) { + total_submitted_ = 0; + total_processed_ = 0; + return true; + } } - - return true; } std::unique_ptr ICompressor::Brotli(uint32_t compression_level, @@ -344,8 +344,8 @@ void CompressWorker::Finalize() { cv_.notify_all(); } -CompressWorker::CompressWorker(std::unique_ptr&& compressor, uint32_t block_size) - : compressor_(std::move(compressor)), block_size_(block_size) {} +CompressWorker::CompressWorker(std::unique_ptr&& compressor) + : compressor_(std::move(compressor)) {} } // namespace snapshot } // namespace android diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp index 8d1786c56..19014c069 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include "writer_v2.h" #include "writer_v3.h" @@ -28,6 +29,7 @@ namespace android { namespace snapshot { using android::base::unique_fd; +using namespace android::storage_literals; std::ostream& EmitCowTypeString(std::ostream& os, CowOperationType cow_type) { switch (cow_type) { @@ -174,5 +176,36 @@ std::unique_ptr CreateCowEstimator(uint32_t version, const CowOption return CreateCowWriter(version, options, unique_fd{-1}, std::nullopt); } +size_t CowOpCompressionSize(const CowOperation* op, size_t block_size) { + uint8_t compression_bits = op->compression_bits(); + return (block_size << compression_bits); +} + +bool GetBlockOffset(const CowOperation* op, uint64_t io_block, size_t block_size, off_t* offset) { + const uint64_t new_block = op->new_block; + + if (op->type() != kCowReplaceOp || io_block < new_block) { + LOG(VERBOSE) << "Invalid IO request for block: " << io_block + << " CowOperation: new_block: " << new_block; + return false; + } + + // Get the actual compression size + const size_t compression_size = CowOpCompressionSize(op, block_size); + // Find the number of blocks spanned + const size_t num_blocks = compression_size / block_size; + // Find the distance of the I/O block which this + // CowOperation encompasses + const size_t block_distance = io_block - new_block; + // Check if this block is within this range; + // if so, return the relative offset + if (block_distance < num_blocks) { + *offset = block_distance * block_size; + return true; + } + + return false; +} + } // namespace snapshot } // namespace android diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp index 1b4a971ca..651649948 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include "cow_decompress.h" @@ -35,6 +36,8 @@ namespace android { namespace snapshot { +using namespace android::storage_literals; + bool ReadCowHeader(android::base::borrowed_fd fd, CowHeaderV3* header) { if (lseek(fd.get(), 0, SEEK_SET) < 0) { PLOG(ERROR) << "lseek header failed"; @@ -161,6 +164,21 @@ bool CowReader::Parse(android::base::borrowed_fd fd, std::optional lab return PrepMergeOps(); } +uint32_t CowReader::GetMaxCompressionSize() { + switch (header_.prefix.major_version) { + case 1: + case 2: + // Old versions supports only 4KB compression. + return header_.block_size; + ; + case 3: + return header_.max_compression_size; + default: + LOG(ERROR) << "Unknown version: " << header_.prefix.major_version; + return 0; + } +} + // // This sets up the data needed for MergeOpIter. MergeOpIter presents // data in the order we intend to merge in. @@ -705,6 +723,11 @@ uint8_t CowReader::GetCompressionType() { ssize_t CowReader::ReadData(const CowOperation* op, void* buffer, size_t buffer_size, size_t ignore_bytes) { std::unique_ptr decompressor; + const size_t op_buf_size = CowOpCompressionSize(op, header_.block_size); + if (!op_buf_size) { + LOG(ERROR) << "Compression size is zero. op: " << *op; + return -1; + } switch (GetCompressionType()) { case kCowCompressNone: break; @@ -715,12 +738,12 @@ ssize_t CowReader::ReadData(const CowOperation* op, void* buffer, size_t buffer_ decompressor = IDecompressor::Brotli(); break; case kCowCompressZstd: - if (header_.block_size != op->data_length) { + if (op_buf_size != op->data_length) { decompressor = IDecompressor::Zstd(); } break; case kCowCompressLz4: - if (header_.block_size != op->data_length) { + if (op_buf_size != op->data_length) { decompressor = IDecompressor::Lz4(); } break; @@ -736,14 +759,14 @@ ssize_t CowReader::ReadData(const CowOperation* op, void* buffer, size_t buffer_ offset = op->source(); } if (!decompressor || - ((op->data_length == header_.block_size) && (header_.prefix.major_version == 3))) { + ((op->data_length == op_buf_size) && (header_.prefix.major_version == 3))) { CowDataStream stream(this, offset + ignore_bytes, op->data_length - ignore_bytes); return stream.ReadFully(buffer, buffer_size); } CowDataStream stream(this, offset, op->data_length); decompressor->set_stream(&stream); - return decompressor->Decompress(buffer, buffer_size, header_.block_size, ignore_bytes); + return decompressor->Decompress(buffer, buffer_size, op_buf_size, ignore_bytes); } bool CowReader::GetSourceOffset(const CowOperation* op, uint64_t* source_offset) { diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp index fe977b723..a35b61404 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp @@ -206,6 +206,8 @@ bool CowParserV2::Translate(TranslatedCowOps* out) { auto& new_op = out->ops->at(i); new_op.set_type(v2_op.type); + // v2 ops always have 4k compression + new_op.set_compression_bits(0); new_op.data_length = v2_op.data_length; if (v2_op.new_block > std::numeric_limits::max()) { diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/snapshot_reader.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/snapshot_reader.cpp index 4e90a0f96..12073fc3e 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/snapshot_reader.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/snapshot_reader.cpp @@ -42,10 +42,21 @@ CompressedSnapshotReader::CompressedSnapshotReader(std::unique_ptr&& op_iter_->Next(); continue; } - if (op->new_block >= ops_.size()) { - ops_.resize(op->new_block + 1, nullptr); + + size_t num_blocks = 1; + if (op->type() == kCowReplaceOp) { + num_blocks = (CowOpCompressionSize(op, block_size_) / block_size_); + } + if (op->new_block >= ops_.size()) { + ops_.resize(op->new_block + num_blocks, nullptr); + } + + size_t vec_index = op->new_block; + while (num_blocks) { + ops_[vec_index] = op; + num_blocks -= 1; + vec_index += 1; } - ops_[op->new_block] = op; op_iter_->Next(); } } @@ -172,11 +183,20 @@ ssize_t CompressedSnapshotReader::ReadBlock(uint64_t chunk, size_t start_offset, } else if (op->type() == kCowZeroOp) { memset(buffer, 0, bytes_to_read); } else if (op->type() == kCowReplaceOp) { - if (cow_->ReadData(op, buffer, bytes_to_read, start_offset) < bytes_to_read) { - LOG(ERROR) << "CompressedSnapshotReader failed to read replace op"; + size_t buffer_size = CowOpCompressionSize(op, block_size_); + uint8_t temp_buffer[buffer_size]; + if (cow_->ReadData(op, temp_buffer, buffer_size, 0) < buffer_size) { + LOG(ERROR) << "CompressedSnapshotReader failed to read replace op: buffer_size: " + << buffer_size << "start_offset: " << start_offset; errno = EIO; return -1; } + off_t block_offset{}; + if (!GetBlockOffset(op, chunk, block_size_, &block_offset)) { + LOG(ERROR) << "GetBlockOffset failed"; + return -1; + } + std::memcpy(buffer, (char*)temp_buffer + block_offset + start_offset, bytes_to_read); } else if (op->type() == kCowXorOp) { borrowed_fd fd = GetSourceFd(); if (fd < 0) { diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp index de602138d..3c5b394be 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include "writer_v2.h" #include "writer_v3.h" @@ -29,6 +30,9 @@ using testing::AssertionSuccess; namespace android { namespace snapshot { +using namespace android::storage_literals; +using ::testing::TestWithParam; + class CowTestV3 : public ::testing::Test { protected: virtual void SetUp() override { @@ -484,14 +488,14 @@ TEST_F(CowTestV3, BufferMetadataSyncTest) { ASSERT_TRUE(reader.Parse(cow_->fd)); auto header = reader.header_v3(); - ASSERT_EQ(header.sequence_data_count, 0); + ASSERT_EQ(header.sequence_data_count, static_cast(0)); ASSERT_EQ(header.resume_point_count, 0); ASSERT_EQ(header.resume_point_max, 4); writer->AddLabel(0); ASSERT_TRUE(reader.Parse(cow_->fd)); header = reader.header_v3(); - ASSERT_EQ(header.sequence_data_count, 0); + ASSERT_EQ(header.sequence_data_count, static_cast(0)); ASSERT_EQ(header.resume_point_count, 1); ASSERT_EQ(header.resume_point_max, 4); @@ -699,5 +703,189 @@ TEST_F(CowTestV3, CheckOpCount) { ASSERT_FALSE(writer->AddZeroBlocks(0, 19)); } +struct TestParam { + std::string compression; + int block_size; + int num_threads; + size_t cluster_ops; +}; + +class VariableBlockTest : public ::testing::TestWithParam { + protected: + virtual void SetUp() override { + cow_ = std::make_unique(); + ASSERT_GE(cow_->fd, 0) << strerror(errno); + } + + virtual void TearDown() override { cow_ = nullptr; } + + unique_fd GetCowFd() { return unique_fd{dup(cow_->fd)}; } + + std::unique_ptr cow_; +}; + +// Helper to check read sizes. +static inline void ReadBlockData(CowReader& reader, const CowOperation* op, void* buffer, + size_t size) { + size_t block_size = CowOpCompressionSize(op, 4096); + std::string data(block_size, '\0'); + size_t value = reader.ReadData(op, data.data(), block_size); + ASSERT_TRUE(value == block_size); + std::memcpy(buffer, data.data(), size); +} + +TEST_P(VariableBlockTest, VariableBlockCompressionTest) { + const TestParam params = GetParam(); + + CowOptions options; + options.op_count_max = 100000; + options.compression = params.compression; + options.num_compress_threads = params.num_threads; + options.batch_write = true; + options.compression_factor = params.block_size; + options.cluster_ops = params.cluster_ops; + + CowWriterV3 writer(options, GetCowFd()); + + ASSERT_TRUE(writer.Initialize()); + + std::string xor_data = "This is test data-1. Testing xor"; + xor_data.resize(options.block_size, '\0'); + ASSERT_TRUE(writer.AddXorBlocks(50, xor_data.data(), xor_data.size(), 24, 10)); + + // Large number of blocks + std::string data = "This is test data-2. Testing replace ops"; + data.resize(options.block_size * 2048, '\0'); + ASSERT_TRUE(writer.AddRawBlocks(100, data.data(), data.size())); + + std::string data2 = "This is test data-3. Testing replace ops"; + data2.resize(options.block_size * 259, '\0'); + ASSERT_TRUE(writer.AddRawBlocks(6000, data2.data(), data2.size())); + + // Test data size is smaller than block size + + // 4k block + std::string data3 = "This is test data-4. Testing replace ops"; + data3.resize(options.block_size, '\0'); + ASSERT_TRUE(writer.AddRawBlocks(9000, data3.data(), data3.size())); + + // 8k block + std::string data4; + data4.resize(options.block_size * 2, '\0'); + for (size_t i = 0; i < data4.size(); i++) { + data4[i] = static_cast('A' + i / options.block_size); + } + ASSERT_TRUE(writer.AddRawBlocks(10000, data4.data(), data4.size())); + + // 16k block + std::string data5; + data.resize(options.block_size * 4, '\0'); + for (int i = 0; i < data5.size(); i++) { + data5[i] = static_cast('C' + i / options.block_size); + } + ASSERT_TRUE(writer.AddRawBlocks(11000, data5.data(), data5.size())); + + // 64k Random buffer which cannot be compressed + unique_fd rnd_fd(open("/dev/random", O_RDONLY)); + ASSERT_GE(rnd_fd, 0); + std::string random_buffer; + random_buffer.resize(65536, '\0'); + ASSERT_EQ(android::base::ReadFullyAtOffset(rnd_fd, random_buffer.data(), 65536, 0), true); + ASSERT_TRUE(writer.AddRawBlocks(12000, random_buffer.data(), 65536)); + + ASSERT_TRUE(writer.Finalize()); + + ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0); + + CowReader reader; + ASSERT_TRUE(reader.Parse(cow_->fd)); + + auto iter = reader.GetOpIter(); + ASSERT_NE(iter, nullptr); + + while (!iter->AtEnd()) { + auto op = iter->Get(); + + if (op->type() == kCowXorOp) { + std::string sink(xor_data.size(), '\0'); + ASSERT_EQ(op->new_block, 50); + ASSERT_EQ(op->source(), 98314); // 4096 * 24 + 10 + ReadBlockData(reader, op, sink.data(), sink.size()); + ASSERT_EQ(sink, xor_data); + } + if (op->type() == kCowReplaceOp) { + if (op->new_block == 100) { + data.resize(options.block_size); + std::string sink(data.size(), '\0'); + ReadBlockData(reader, op, sink.data(), sink.size()); + ASSERT_EQ(sink.size(), data.size()); + ASSERT_EQ(sink, data); + } + if (op->new_block == 6000) { + data2.resize(options.block_size); + std::string sink(data2.size(), '\0'); + ReadBlockData(reader, op, sink.data(), sink.size()); + ASSERT_EQ(sink, data2); + } + if (op->new_block == 9000) { + std::string sink(data3.size(), '\0'); + ReadBlockData(reader, op, sink.data(), sink.size()); + ASSERT_EQ(sink, data3); + } + if (op->new_block == 10000) { + data4.resize(options.block_size); + std::string sink(options.block_size, '\0'); + ReadBlockData(reader, op, sink.data(), sink.size()); + ASSERT_EQ(sink, data4); + } + if (op->new_block == 11000) { + data5.resize(options.block_size); + std::string sink(options.block_size, '\0'); + ReadBlockData(reader, op, sink.data(), sink.size()); + ASSERT_EQ(sink, data5); + } + if (op->new_block == 12000) { + random_buffer.resize(options.block_size); + std::string sink(options.block_size, '\0'); + ReadBlockData(reader, op, sink.data(), sink.size()); + ASSERT_EQ(sink, random_buffer); + } + } + + iter->Next(); + } +} + +std::vector GetTestConfigs() { + std::vector testParams; + + std::vector block_sizes = {4_KiB, 8_KiB, 16_KiB, 32_KiB, 64_KiB, 128_KiB, 256_KiB}; + std::vector compression_algo = {"none", "lz4", "zstd", "gz"}; + std::vector threads = {1, 2}; + // This will also test batch size + std::vector cluster_ops = {1, 256}; + + // This should test 112 combination + for (auto block : block_sizes) { + for (auto compression : compression_algo) { + for (auto thread : threads) { + for (auto cluster : cluster_ops) { + TestParam param; + param.block_size = block; + param.compression = compression; + param.num_threads = thread; + param.cluster_ops = cluster; + testParams.push_back(std::move(param)); + } + } + } + } + + return testParams; +} + +INSTANTIATE_TEST_SUITE_P(CompressorsWithVariableBlocks, VariableBlockTest, + ::testing::ValuesIn(GetTestConfigs())); + } // namespace snapshot } // namespace android diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp index 75cd1114d..d0864e0a3 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp @@ -185,7 +185,7 @@ void CowWriterV2::InitWorkers() { for (int i = 0; i < num_compress_threads_; i++) { std::unique_ptr compressor = ICompressor::Create(compression_, header_.block_size); - auto wt = std::make_unique(std::move(compressor), header_.block_size); + auto wt = std::make_unique(std::move(compressor)); threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get())); compress_threads_.push_back(std::move(wt)); } @@ -353,7 +353,7 @@ bool CowWriterV2::CompressBlocks(size_t num_blocks, const void* data) { if (i == num_threads - 1) { num_blocks_per_thread = num_blocks; } - worker->EnqueueCompressBlocks(iter, num_blocks_per_thread); + worker->EnqueueCompressBlocks(iter, header_.block_size, num_blocks_per_thread); iter += (num_blocks_per_thread * header_.block_size); num_blocks -= num_blocks_per_thread; } diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp index 251b24e6a..c92460a7c 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp @@ -37,6 +37,7 @@ #include #include #include +#include #include #include #include @@ -54,6 +55,7 @@ namespace snapshot { static_assert(sizeof(off_t) == sizeof(uint64_t)); +using namespace android::storage_literals; using android::base::unique_fd; // Divide |x| by |y| and round up to the nearest integer. @@ -77,9 +79,9 @@ void CowWriterV3::InitWorkers() { threads_.clear(); for (size_t i = 0; i < num_compress_threads_; i++) { std::unique_ptr compressor = - ICompressor::Create(compression_, header_.block_size); + ICompressor::Create(compression_, header_.max_compression_size); auto&& wt = compress_threads_.emplace_back( - std::make_unique(std::move(compressor), header_.block_size)); + std::make_unique(std::move(compressor))); threads_.emplace_back(std::thread([wt = wt.get()]() { wt->RunThread(); })); } LOG(INFO) << num_compress_threads_ << " thread used for compression"; @@ -111,10 +113,15 @@ void CowWriterV3::SetupHeaders() { header_.op_count_max = 0; header_.compression_algorithm = kCowCompressNone; header_.max_compression_size = options_.compression_factor; - return; } bool CowWriterV3::ParseOptions() { + if (!header_.max_compression_size || !IsBlockAligned(header_.max_compression_size)) { + LOG(ERROR) << "Invalid compression factor: " << header_.max_compression_size; + return false; + } + + LOG(INFO) << "Compression factor: " << header_.max_compression_size; num_compress_threads_ = std::max(int(options_.num_compress_threads), 1); auto parts = android::base::Split(options_.compression, ","); if (parts.size() > 2) { @@ -154,20 +161,22 @@ bool CowWriterV3::ParseOptions() { compression_.algorithm = *algorithm; if (compression_.algorithm != kCowCompressNone) { - compressor_ = ICompressor::Create(compression_, header_.block_size); + compressor_ = ICompressor::Create(compression_, header_.max_compression_size); if (compressor_ == nullptr) { LOG(ERROR) << "Failed to create compressor for " << compression_.algorithm; return false; } - if (options_.cluster_ops && - (android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) || - options_.batch_write)) { - batch_size_ = std::max(options_.cluster_ops, 1); - data_vec_.reserve(batch_size_); - cached_data_.reserve(batch_size_); - cached_ops_.reserve(batch_size_); - } } + + if (options_.cluster_ops && + (android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) || + options_.batch_write)) { + batch_size_ = std::max(options_.cluster_ops, 1); + data_vec_.reserve(batch_size_); + cached_data_.reserve(batch_size_); + cached_ops_.reserve(batch_size_); + } + if (batch_size_ > 1) { LOG(INFO) << "Batch writes: enabled with batch size " << batch_size_; } else { @@ -178,6 +187,7 @@ bool CowWriterV3::ParseOptions() { num_compress_threads_ = options_.num_compress_threads; } InitWorkers(); + return true; } @@ -206,6 +216,14 @@ bool CowWriterV3::Initialize(std::optional label) { } } + // TODO: b/322279333 + // Set compression factor to 4k during estimation. + // Once COW estimator is ready to support variable + // block size, this check has to be removed. + if (IsEstimating()) { + header_.max_compression_size = header_.block_size; + } + return true; } @@ -328,6 +346,46 @@ bool CowWriterV3::NeedsFlush() const { return cached_data_.size() >= batch_size_ || cached_ops_.size() >= batch_size_ * 16; } +bool CowWriterV3::ConstructCowOpCompressedBuffers(uint64_t new_block_start, const void* data, + uint64_t old_block, uint16_t offset, + CowOperationType type, size_t blocks_to_write) { + size_t compressed_bytes = 0; + auto&& blocks = CompressBlocks(blocks_to_write, data, type); + if (blocks.empty()) { + LOG(ERROR) << "Failed to compress blocks " << new_block_start << ", " << blocks_to_write + << ", actual number of blocks received from compressor " << blocks.size(); + return false; + } + size_t blocks_written = 0; + for (size_t blk_index = 0; blk_index < blocks.size(); blk_index++) { + CowOperation& op = cached_ops_.emplace_back(); + auto& vec = data_vec_.emplace_back(); + CompressedBuffer buffer = std::move(blocks[blk_index]); + auto& compressed_data = cached_data_.emplace_back(std::move(buffer.compressed_data)); + op.new_block = new_block_start + blocks_written; + + op.set_type(type); + op.set_compression_bits(std::log2(buffer.compression_factor / header_.block_size)); + + if (type == kCowXorOp) { + op.set_source((old_block + blocks_written) * header_.block_size + offset); + } else { + op.set_source(next_data_pos_ + compressed_bytes); + } + + vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()}; + op.data_length = vec.iov_len; + compressed_bytes += op.data_length; + blocks_written += (buffer.compression_factor / header_.block_size); + } + if (blocks_written != blocks_to_write) { + LOG(ERROR) << "Total compressed blocks: " << blocks_written + << " Expected: " << blocks_to_write; + return false; + } + return true; +} + bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block, uint16_t offset, CowOperationType type) { if (compression_.algorithm != kCowCompressNone && compressor_ == nullptr) { @@ -341,38 +399,21 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t return false; } for (size_t i = 0; i < num_blocks;) { - const auto blocks_to_write = + const size_t blocks_to_write = std::min(batch_size_ - cached_data_.size(), num_blocks - i); - size_t compressed_bytes = 0; - auto&& blocks = CompressBlocks(blocks_to_write, bytes + header_.block_size * i); - if (blocks.size() != blocks_to_write) { - LOG(ERROR) << "Failed to compress blocks " << new_block_start + i << ", " - << blocks_to_write << ", actual number of blocks received from compressor " - << blocks.size(); + + if (!ConstructCowOpCompressedBuffers(new_block_start + i, bytes + header_.block_size * i, + old_block + i, offset, type, blocks_to_write)) { return false; } - for (size_t j = 0; j < blocks_to_write; j++) { - CowOperation& op = cached_ops_.emplace_back(); - auto& vec = data_vec_.emplace_back(); - auto& compressed_data = cached_data_.emplace_back(std::move(blocks[j])); - op.new_block = new_block_start + i + j; - op.set_type(type); - if (type == kCowXorOp) { - op.set_source((old_block + i + j) * header_.block_size + offset); - } else { - op.set_source(next_data_pos_ + compressed_bytes); - } - vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()}; - op.data_length = vec.iov_len; - compressed_bytes += op.data_length; - } if (NeedsFlush() && !FlushCacheOps()) { LOG(ERROR) << "EmitBlocks with compression: write failed. new block: " << new_block_start << " compression: " << compression_.algorithm << ", op type: " << type; return false; } + i += blocks_to_write; } @@ -482,55 +523,165 @@ bool CowWriterV3::FlushCacheOps() { return true; } -std::vector> CowWriterV3::CompressBlocks(const size_t num_blocks, - const void* data) { - const size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_; - const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads); - std::vector> compressed_buf; - compressed_buf.clear(); - const uint8_t* const iter = reinterpret_cast(data); - if (compression_.algorithm == kCowCompressNone) { - for (size_t i = 0; i < num_blocks; i++) { - auto& buf = compressed_buf.emplace_back(); - buf.resize(header_.block_size); - std::memcpy(buf.data(), iter + i * header_.block_size, header_.block_size); - } - return compressed_buf; +size_t CowWriterV3::GetCompressionFactor(const size_t blocks_to_compress, + CowOperationType type) const { + // For XOR ops, we don't support bigger block size compression yet. + // For bigger block size support, snapshot-merge also has to changed. We + // aren't there yet; hence, just stick to 4k for now until + // snapshot-merge is ready for XOR operation. + if (type == kCowXorOp) { + return header_.block_size; } - if (num_threads <= 1) { - if (!CompressWorker::CompressBlocks(compressor_.get(), header_.block_size, data, num_blocks, - &compressed_buf)) { + + size_t compression_factor = header_.max_compression_size; + while (compression_factor > header_.block_size) { + size_t num_blocks = compression_factor / header_.block_size; + if (blocks_to_compress >= num_blocks) { + return compression_factor; + } + compression_factor >>= 1; + } + return header_.block_size; +} + +std::vector CowWriterV3::ProcessBlocksWithNoCompression( + const size_t num_blocks, const void* data, CowOperationType type) { + size_t blocks_to_compress = num_blocks; + const uint8_t* iter = reinterpret_cast(data); + std::vector compressed_vec; + + while (blocks_to_compress) { + CompressedBuffer buffer; + + const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type); + size_t num_blocks = compression_factor / header_.block_size; + + buffer.compression_factor = compression_factor; + buffer.compressed_data.resize(compression_factor); + + // No compression. Just copy the data as-is. + std::memcpy(buffer.compressed_data.data(), iter, compression_factor); + + compressed_vec.push_back(std::move(buffer)); + blocks_to_compress -= num_blocks; + iter += compression_factor; + } + return compressed_vec; +} + +std::vector CowWriterV3::ProcessBlocksWithCompression( + const size_t num_blocks, const void* data, CowOperationType type) { + size_t blocks_to_compress = num_blocks; + const uint8_t* iter = reinterpret_cast(data); + std::vector compressed_vec; + + while (blocks_to_compress) { + CompressedBuffer buffer; + + const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type); + size_t num_blocks = compression_factor / header_.block_size; + + buffer.compression_factor = compression_factor; + // Compress the blocks + buffer.compressed_data = compressor_->Compress(iter, compression_factor); + if (buffer.compressed_data.empty()) { + PLOG(ERROR) << "Compression failed"; return {}; } - } else { - // Submit the blocks per thread. The retrieval of - // compressed buffers has to be done in the same order. - // We should not poll for completed buffers in a different order as the - // buffers are tightly coupled with block ordering. - for (size_t i = 0; i < num_threads; i++) { - CompressWorker* worker = compress_threads_[i].get(); - const auto blocks_in_batch = - std::min(num_blocks - i * blocks_per_thread, blocks_per_thread); - worker->EnqueueCompressBlocks(iter + i * blocks_per_thread * header_.block_size, - blocks_in_batch); + + // Check if the buffer was indeed compressed + if (buffer.compressed_data.size() >= compression_factor) { + buffer.compressed_data.resize(compression_factor); + std::memcpy(buffer.compressed_data.data(), iter, compression_factor); } - for (size_t i = 0; i < num_threads; i++) { - CompressWorker* worker = compress_threads_[i].get(); - if (!worker->GetCompressedBuffers(&compressed_buf)) { - return {}; - } + compressed_vec.push_back(std::move(buffer)); + blocks_to_compress -= num_blocks; + iter += compression_factor; + } + return compressed_vec; +} + +std::vector CowWriterV3::ProcessBlocksWithThreadedCompression( + const size_t num_blocks, const void* data, CowOperationType type) { + const size_t num_threads = num_compress_threads_; + const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads); + const uint8_t* iter = reinterpret_cast(data); + + std::vector compressed_vec; + // Submit the blocks per thread. The retrieval of + // compressed buffers has to be done in the same order. + // We should not poll for completed buffers in a different order as the + // buffers are tightly coupled with block ordering. + for (size_t i = 0; i < num_threads; i++) { + CompressWorker* worker = compress_threads_[i].get(); + auto blocks_in_batch = std::min(num_blocks - i * blocks_per_thread, blocks_per_thread); + // Enqueue the blocks to be compressed for each thread. + while (blocks_in_batch) { + CompressedBuffer buffer; + + const size_t compression_factor = GetCompressionFactor(blocks_in_batch, type); + size_t num_blocks = compression_factor / header_.block_size; + + buffer.compression_factor = compression_factor; + worker->EnqueueCompressBlocks(iter, compression_factor, 1); + compressed_vec.push_back(std::move(buffer)); + blocks_in_batch -= num_blocks; + iter += compression_factor; } } - for (size_t i = 0; i < num_blocks; i++) { + + // Fetch compressed buffers from the threads + std::vector> compressed_buf; + compressed_buf.clear(); + for (size_t i = 0; i < num_threads; i++) { + CompressWorker* worker = compress_threads_[i].get(); + if (!worker->GetCompressedBuffers(&compressed_buf)) { + return {}; + } + } + + if (compressed_vec.size() != compressed_buf.size()) { + LOG(ERROR) << "Compressed buffer size: " << compressed_buf.size() + << " - Expected: " << compressed_vec.size(); + return {}; + } + + iter = reinterpret_cast(data); + // Walk through all the compressed buffers + for (size_t i = 0; i < compressed_buf.size(); i++) { + auto& buffer = compressed_vec[i]; auto& block = compressed_buf[i]; - if (block.size() >= header_.block_size) { - block.resize(header_.block_size); - std::memcpy(block.data(), iter + header_.block_size * i, header_.block_size); + size_t block_size = buffer.compression_factor; + // Check if the blocks was indeed compressed + if (block.size() >= block_size) { + buffer.compressed_data.resize(block_size); + std::memcpy(buffer.compressed_data.data(), iter, block_size); + } else { + // Compressed block + buffer.compressed_data.resize(block.size()); + std::memcpy(buffer.compressed_data.data(), block.data(), block.size()); } + iter += block_size; + } + return compressed_vec; +} + +std::vector CowWriterV3::CompressBlocks(const size_t num_blocks, + const void* data, + CowOperationType type) { + if (compression_.algorithm == kCowCompressNone) { + return ProcessBlocksWithNoCompression(num_blocks, data, type); } - return compressed_buf; + const size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_; + + // If no threads are required, just compress the blocks inline. + if (num_threads <= 1) { + return ProcessBlocksWithCompression(num_blocks, data, type); + } + + return ProcessBlocksWithThreadedCompression(num_blocks, data, type); } bool CowWriterV3::WriteOperation(std::basic_string_view ops, diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h index b19af60a3..4915e9c08 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h +++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h @@ -19,11 +19,15 @@ #include #include +#include +#include #include "writer_base.h" namespace android { namespace snapshot { +using namespace android::storage_literals; + class CowWriterV3 : public CowWriterBase { public: explicit CowWriterV3(const CowOptions& options, android::base::unique_fd&& fd); @@ -43,6 +47,10 @@ class CowWriterV3 : public CowWriterBase { virtual bool EmitSequenceData(size_t num_ops, const uint32_t* data) override; private: + struct CompressedBuffer { + size_t compression_factor; + std::basic_string compressed_data; + }; void SetupHeaders(); bool NeedsFlush() const; bool ParseOptions(); @@ -52,11 +60,38 @@ class CowWriterV3 : public CowWriterBase { std::basic_string_view data); bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block, uint16_t offset, CowOperationType type); + bool ConstructCowOpCompressedBuffers(uint64_t new_block_start, const void* data, + uint64_t old_block, uint16_t offset, CowOperationType type, + size_t blocks_to_write); bool CheckOpCount(size_t op_count); private: - std::vector> CompressBlocks(const size_t num_blocks, - const void* data); + std::vector ProcessBlocksWithNoCompression(const size_t num_blocks, + const void* data, + CowOperationType type); + std::vector ProcessBlocksWithCompression(const size_t num_blocks, + const void* data, + CowOperationType type); + std::vector ProcessBlocksWithThreadedCompression(const size_t num_blocks, + const void* data, + CowOperationType type); + std::vector CompressBlocks(const size_t num_blocks, const void* data, + CowOperationType type); + size_t GetCompressionFactor(const size_t blocks_to_compress, CowOperationType type) const; + + constexpr bool IsBlockAligned(const size_t size) { + // These are the only block size supported. Block size beyond 256k + // may impact random read performance post OTA boot. + const size_t values[] = {4_KiB, 8_KiB, 16_KiB, 32_KiB, 64_KiB, 128_KiB, 256_KiB}; + + auto it = std::lower_bound(std::begin(values), std::end(values), size); + + if (it != std::end(values) && *it == size) { + return true; + } + return false; + } + bool ReadBackVerification(); bool FlushCacheOps(); void InitWorkers(); diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp index 1e7d0c0a2..bd7eaca74 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp @@ -13,10 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "merge_worker.h" +#include #include +#include "merge_worker.h" #include "snapuserd_core.h" #include "utility.h" @@ -37,6 +38,7 @@ int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops, int num_ops = *pending_ops; int nr_consecutive = 0; bool checkOrderedOp = (replace_zero_vec == nullptr); + size_t num_blocks = 1; do { if (!cowop_iter_->AtEnd() && num_ops) { @@ -48,11 +50,15 @@ int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops, *source_offset = cow_op->new_block * BLOCK_SZ; if (!checkOrderedOp) { replace_zero_vec->push_back(cow_op); + if (cow_op->type() == kCowReplaceOp) { + // Get the number of blocks this op has compressed + num_blocks = (CowOpCompressionSize(cow_op, BLOCK_SZ) / BLOCK_SZ); + } } cowop_iter_->Next(); - num_ops -= 1; - nr_consecutive = 1; + num_ops -= num_blocks; + nr_consecutive = num_blocks; while (!cowop_iter_->AtEnd() && num_ops) { const CowOperation* op = cowop_iter_->Get(); @@ -66,11 +72,20 @@ int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops, } if (!checkOrderedOp) { + if (op->type() == kCowReplaceOp) { + num_blocks = (CowOpCompressionSize(op, BLOCK_SZ) / BLOCK_SZ); + if (num_ops < num_blocks) { + break; + } + } else { + // zero op + num_blocks = 1; + } replace_zero_vec->push_back(op); } - nr_consecutive += 1; - num_ops -= 1; + nr_consecutive += num_blocks; + num_ops -= num_blocks; cowop_iter_->Next(); } } @@ -108,18 +123,24 @@ bool MergeWorker::MergeReplaceZeroOps() { for (size_t i = 0; i < replace_zero_vec.size(); i++) { const CowOperation* cow_op = replace_zero_vec[i]; - - void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ); - if (!buffer) { - SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps"; - return false; - } if (cow_op->type() == kCowReplaceOp) { - if (!reader_->ReadData(cow_op, buffer, BLOCK_SZ)) { + size_t buffer_size = CowOpCompressionSize(cow_op, BLOCK_SZ); + void* buffer = bufsink_.AcquireBuffer(buffer_size); + if (!buffer) { + SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps"; + return false; + } + // Read the entire compressed buffer spanning multiple blocks + if (!reader_->ReadData(cow_op, buffer, buffer_size)) { SNAP_LOG(ERROR) << "Failed to read COW in merge"; return false; } } else { + void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ); + if (!buffer) { + SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps"; + return false; + } CHECK(cow_op->type() == kCowZeroOp); memset(buffer, 0, BLOCK_SZ); } @@ -137,7 +158,7 @@ bool MergeWorker::MergeReplaceZeroOps() { return false; } - num_ops_merged += linear_blocks; + num_ops_merged += replace_zero_vec.size(); if (num_ops_merged >= total_ops_merged_per_commit) { // Flush the data diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp index f1d406534..d40b6d11d 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp @@ -14,10 +14,10 @@ * limitations under the License. */ -#include "read_worker.h" - +#include #include +#include "read_worker.h" #include "snapuserd_core.h" #include "utility.h" @@ -48,9 +48,10 @@ ReadWorker::ReadWorker(const std::string& cow_device, const std::string& backing // Start the replace operation. This will read the // internal COW format and if the block is compressed, // it will be de-compressed. -bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op, void* buffer) { - if (!reader_->ReadData(cow_op, buffer, BLOCK_SZ)) { - SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block; +bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op, void* buffer, size_t buffer_size) { + if (!reader_->ReadData(cow_op, buffer, buffer_size)) { + SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block + << " buffer_size: " << buffer_size; return false; } return true; @@ -183,7 +184,13 @@ bool ReadWorker::ProcessCowOp(const CowOperation* cow_op, void* buffer) { switch (cow_op->type()) { case kCowReplaceOp: { - return ProcessReplaceOp(cow_op, buffer); + size_t buffer_size = CowOpCompressionSize(cow_op, BLOCK_SZ); + uint8_t chunk[buffer_size]; + if (!ProcessReplaceOp(cow_op, chunk, buffer_size)) { + return false; + } + std::memcpy(buffer, chunk, BLOCK_SZ); + return true; } case kCowZeroOp: { @@ -209,6 +216,13 @@ bool ReadWorker::Init() { return false; } + const size_t compression_factor = reader_->GetMaxCompressionSize(); + if (!compression_factor) { + SNAP_LOG(ERROR) << "Compression factor is set to 0 which is invalid."; + return false; + } + decompressed_buffer_ = std::make_unique(compression_factor); + backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY)); if (backing_store_fd_ < 0) { SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_; @@ -276,6 +290,20 @@ bool ReadWorker::ReadDataFromBaseDevice(sector_t sector, void* buffer, size_t re return true; } +bool ReadWorker::GetCowOpBlockOffset(const CowOperation* cow_op, uint64_t io_block, + off_t* block_offset) { + // If this is a replace op, get the block offset of this I/O + // block. Multi-block compression is supported only for + // Replace ops. + // + // Note: This can be extended when we support COPY and XOR ops down the + // line as the blocks are mostly contiguous. + if (cow_op && cow_op->type() == kCowReplaceOp) { + return GetBlockOffset(cow_op, io_block, BLOCK_SZ, block_offset); + } + return false; +} + bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) { size_t remaining_size = sz; std::vector>& chunk_vec = snapuserd_->GetChunkVec(); @@ -286,7 +314,7 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) { size_t read_size = std::min(PAYLOAD_BUFFER_SZ, remaining_size); size_t total_bytes_read = 0; - + const CowOperation* prev_op = nullptr; while (read_size) { // We need to check every 4k block to verify if it is // present in the mapping. @@ -294,7 +322,7 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) { auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr), SnapshotHandler::compare); - bool not_found = (it == chunk_vec.end() || it->first != sector); + const bool sector_not_found = (it == chunk_vec.end() || it->first != sector); void* buffer = block_server_->GetResponseBuffer(BLOCK_SZ, size); if (!buffer) { @@ -302,15 +330,88 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) { return false; } - if (not_found) { - // Block not found in map - which means this block was not - // changed as per the OTA. Just route the I/O to the base - // device. - if (!ReadDataFromBaseDevice(sector, buffer, size)) { - SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed"; - return false; + if (sector_not_found) { + // Find the 4k block + uint64_t io_block = SectorToChunk(sector); + // Get the previous iterator. Since the vector is sorted, the + // lookup of this sector can fall in a range of blocks if + // CowOperation has compressed multiple blocks. + if (it != chunk_vec.begin()) { + std::advance(it, -1); } + bool is_mapping_present = true; + + // Vector itself is empty. This can happen if the block was not + // changed per the OTA or if the merge was already complete but + // snapshot table was not yet collapsed. + if (it == chunk_vec.end()) { + is_mapping_present = false; + } + + const CowOperation* cow_op = nullptr; + // Relative offset within the compressed multiple blocks + off_t block_offset = 0; + if (is_mapping_present) { + // Get the nearest operation found in the vector + cow_op = it->second; + is_mapping_present = GetCowOpBlockOffset(cow_op, io_block, &block_offset); + } + + // Thus, we have a case wherein sector was not found in the sorted + // vector; however, we indeed have a mapping of this sector + // embedded in one of the CowOperation which spans multiple + // block size. + if (is_mapping_present) { + // block_offset = 0 would mean that the CowOperation should + // already be in the sorted vector. Hence, lookup should + // have already found it. If not, this is a bug. + if (block_offset == 0) { + SNAP_LOG(ERROR) + << "GetBlockOffset returned offset 0 for io_block: " << io_block; + return false; + } + + // Get the CowOperation actual compression size + size_t compression_size = CowOpCompressionSize(cow_op, BLOCK_SZ); + // Offset cannot be greater than the compression size + if (block_offset > compression_size) { + SNAP_LOG(ERROR) << "Invalid I/O block found. io_block: " << io_block + << " CowOperation-new-block: " << cow_op->new_block + << " compression-size: " << compression_size; + return false; + } + + // Cached copy of the previous iteration. Just retrieve the + // data + if (prev_op && prev_op->new_block == cow_op->new_block) { + std::memcpy(buffer, (char*)decompressed_buffer_.get() + block_offset, size); + } else { + // Get the data from the disk based on the compression + // size + if (!ProcessReplaceOp(cow_op, decompressed_buffer_.get(), + compression_size)) { + return false; + } + // Copy the data from the decompressed buffer relative + // to the i/o block offset. + std::memcpy(buffer, (char*)decompressed_buffer_.get() + block_offset, size); + // Cache this CowOperation pointer for successive I/O + // operation. Since the request is sequential and the + // block is already decompressed, subsequest I/O blocks + // can fetch the data directly from this decompressed + // buffer. + prev_op = cow_op; + } + } else { + // Block not found in map - which means this block was not + // changed as per the OTA. Just route the I/O to the base + // device. + if (!ReadDataFromBaseDevice(sector, buffer, size)) { + SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed"; + return false; + } + } ret = size; } else { // We found the sector in mapping. Check the type of COW OP and @@ -341,12 +442,50 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) { return true; } +bool ReadWorker::IsMappingPresent(const CowOperation* cow_op, loff_t requested_offset, + loff_t cow_op_offset) { + const bool replace_op = (cow_op->type() == kCowReplaceOp); + if (replace_op) { + size_t max_compressed_size = CowOpCompressionSize(cow_op, BLOCK_SZ); + if ((requested_offset >= cow_op_offset) && + (requested_offset < (cow_op_offset + max_compressed_size))) { + return true; + } + } + return false; +} + int ReadWorker::ReadUnalignedSector( sector_t sector, size_t size, std::vector>::iterator& it) { SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size << " Aligned sector: " << it->first; + loff_t requested_offset = sector << SECTOR_SHIFT; + loff_t final_offset = (it->first) << SECTOR_SHIFT; + + const CowOperation* cow_op = it->second; + if (IsMappingPresent(cow_op, requested_offset, final_offset)) { + size_t buffer_size = CowOpCompressionSize(cow_op, BLOCK_SZ); + uint8_t chunk[buffer_size]; + // Read the entire decompressed buffer based on the block-size + if (!ProcessReplaceOp(cow_op, chunk, buffer_size)) { + return -1; + } + size_t skip_offset = (requested_offset - final_offset); + size_t write_sz = std::min(size, buffer_size - skip_offset); + + auto buffer = + reinterpret_cast(block_server_->GetResponseBuffer(BLOCK_SZ, write_sz)); + if (!buffer) { + SNAP_LOG(ERROR) << "ReadUnalignedSector failed to allocate buffer"; + return -1; + } + + std::memcpy(buffer, (char*)chunk + skip_offset, write_sz); + return write_sz; + } + int num_sectors_skip = sector - it->first; size_t skip_size = num_sectors_skip << SECTOR_SHIFT; size_t write_size = std::min(size, BLOCK_SZ - skip_size); @@ -445,8 +584,11 @@ bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) { size_t remaining_size = size; int ret = 0; + + const CowOperation* cow_op = it->second; if (!merge_complete && (requested_offset >= final_offset) && - (requested_offset - final_offset) < BLOCK_SZ) { + (((requested_offset - final_offset) < BLOCK_SZ) || + IsMappingPresent(cow_op, requested_offset, final_offset))) { // Read the partial un-aligned data ret = ReadUnalignedSector(sector, remaining_size, it); if (ret < 0) { diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h index 1aff50c08..43e896a9a 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h @@ -44,9 +44,12 @@ class ReadWorker : public Worker, public IBlockServer::Delegate { bool ProcessXorOp(const CowOperation* cow_op, void* buffer); bool ProcessOrderedOp(const CowOperation* cow_op, void* buffer); bool ProcessCopyOp(const CowOperation* cow_op, void* buffer); - bool ProcessReplaceOp(const CowOperation* cow_op, void* buffer); + bool ProcessReplaceOp(const CowOperation* cow_op, void* buffer, size_t buffer_size); bool ProcessZeroOp(void* buffer); + bool IsMappingPresent(const CowOperation* cow_op, loff_t requested_offset, + loff_t cow_op_offset); + bool GetCowOpBlockOffset(const CowOperation* cow_op, uint64_t io_block, off_t* block_offset); bool ReadAlignedSector(sector_t sector, size_t sz); bool ReadUnalignedSector(sector_t sector, size_t size); int ReadUnalignedSector(sector_t sector, size_t size, @@ -56,6 +59,7 @@ class ReadWorker : public Worker, public IBlockServer::Delegate { constexpr bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); } constexpr sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; } + constexpr chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; } std::string backing_store_device_; unique_fd backing_store_fd_; @@ -67,6 +71,7 @@ class ReadWorker : public Worker, public IBlockServer::Delegate { std::basic_string xor_buffer_; std::unique_ptr aligned_buffer_; + std::unique_ptr decompressed_buffer_; }; } // namespace snapshot diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp index 8ddb0f423..76b44b48b 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp @@ -64,6 +64,9 @@ using ::testing::TestWithParam; struct TestParam { bool io_uring; bool o_direct; + std::string compression; + int block_size; + int num_threads; }; class SnapuserdTestBase : public ::testing::TestWithParam { @@ -74,6 +77,7 @@ class SnapuserdTestBase : public ::testing::TestWithParam { void CreateCowDevice(); void SetDeviceControlName(); std::unique_ptr CreateCowDeviceInternal(); + std::unique_ptr CreateV3Cow(); std::unique_ptr harness_; size_t size_ = 10_MiB; @@ -133,6 +137,24 @@ std::unique_ptr SnapuserdTestBase::CreateCowDeviceInternal() { return CreateCowWriter(kDefaultCowVersion, options, std::move(fd)); } +std::unique_ptr SnapuserdTestBase::CreateV3Cow() { + const TestParam params = GetParam(); + + CowOptions options; + options.op_count_max = 100000; + options.compression = params.compression; + options.num_compress_threads = params.num_threads; + options.batch_write = true; + options.compression_factor = params.block_size; + + cow_system_ = std::make_unique(); + + unique_fd fd(cow_system_->fd); + cow_system_->fd = -1; + + return CreateCowWriter(3, options, std::move(fd)); +} + void SnapuserdTestBase::CreateCowDevice() { unique_fd rnd_fd; loff_t offset = 0; @@ -236,6 +258,7 @@ class SnapuserdTest : public SnapuserdTestBase { void SetupOrderedOpsInverted(); void SetupCopyOverlap_1(); void SetupCopyOverlap_2(); + void SetupDeviceForPassthrough(); bool Merge(); void ValidateMerge(); void ReadSnapshotDeviceAndValidate(); @@ -258,6 +281,9 @@ class SnapuserdTest : public SnapuserdTestBase { void SimulateDaemonRestart(); + void CreateCowDeviceWithNoBlockChanges(); + void ValidateDeviceWithNoBlockChanges(); + void CreateCowDeviceOrderedOps(); void CreateCowDeviceOrderedOpsInverted(); void CreateCowDeviceWithCopyOverlap_1(); @@ -307,6 +333,12 @@ void SnapuserdTest::SetupOrderedOps() { ASSERT_NO_FATAL_FAILURE(SetupDaemon()); } +void SnapuserdTest::SetupDeviceForPassthrough() { + ASSERT_NO_FATAL_FAILURE(CreateBaseDevice()); + ASSERT_NO_FATAL_FAILURE(CreateCowDeviceWithNoBlockChanges()); + ASSERT_NO_FATAL_FAILURE(SetupDaemon()); +} + void SnapuserdTest::SetupOrderedOpsInverted() { ASSERT_NO_FATAL_FAILURE(CreateBaseDevice()); ASSERT_NO_FATAL_FAILURE(CreateCowDeviceOrderedOpsInverted()); @@ -480,6 +512,47 @@ void SnapuserdTest::CreateCowDeviceWithCopyOverlap_2() { } } +void SnapuserdTest::CreateCowDeviceWithNoBlockChanges() { + auto writer = CreateCowDeviceInternal(); + ASSERT_NE(writer, nullptr); + + std::unique_ptr buffer = std::make_unique(BLOCK_SZ); + std::memset(buffer.get(), 'A', BLOCK_SZ); + + // This test focusses on not changing all the blocks thereby validating + // the pass-through I/O + + // Replace the first block + ASSERT_TRUE(writer->AddRawBlocks(1, buffer.get(), BLOCK_SZ)); + + // Set zero block of Block 3 + ASSERT_TRUE(writer->AddZeroBlocks(3, 1)); + + ASSERT_TRUE(writer->Finalize()); + orig_buffer_ = std::make_unique(total_base_size_); + + // Read the entire base device + ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, orig_buffer_.get(), total_base_size_, 0), + true); + + off_t offset = BLOCK_SZ; + std::memcpy(orig_buffer_.get() + offset, buffer.get(), BLOCK_SZ); + offset = 3 * BLOCK_SZ; + std::memset(orig_buffer_.get() + offset, 0, BLOCK_SZ); +} + +void SnapuserdTest::ValidateDeviceWithNoBlockChanges() { + unique_fd fd(open(dmuser_dev_->GetPath().c_str(), O_RDONLY)); + ASSERT_GE(fd, 0); + std::unique_ptr snapshot_buffer = std::make_unique(size_); + std::memset(snapshot_buffer.get(), 'B', size_); + + // All the I/O request should be a pass through to base device except for + // Block 1 and Block 3. + ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), size_, 0), true); + ASSERT_EQ(memcmp(snapshot_buffer.get(), orig_buffer_.get(), size_), 0); +} + void SnapuserdTest::CreateCowDeviceWithCopyOverlap_1() { auto writer = CreateCowDeviceInternal(); ASSERT_NE(writer, nullptr); @@ -781,6 +854,20 @@ void SnapuserdTest::MergeInterrupt() { ASSERT_TRUE(Merge()); } +TEST_P(SnapuserdTest, Snapshot_Passthrough) { + if (!harness_->HasUserDevice()) { + GTEST_SKIP() << "Skipping snapshot read; not supported"; + } + ASSERT_NO_FATAL_FAILURE(SetupDeviceForPassthrough()); + // I/O before merge + ASSERT_NO_FATAL_FAILURE(ValidateDeviceWithNoBlockChanges()); + ASSERT_TRUE(Merge()); + ValidateMerge(); + // I/O after merge - daemon should read directly + // from base device + ASSERT_NO_FATAL_FAILURE(ValidateDeviceWithNoBlockChanges()); +} + TEST_P(SnapuserdTest, Snapshot_IO_TEST) { if (!harness_->HasUserDevice()) { GTEST_SKIP() << "Skipping snapshot read; not supported"; @@ -853,7 +940,7 @@ TEST_P(SnapuserdTest, Snapshot_COPY_Overlap_Merge_Resume_IO_Validate_TEST) { GTEST_SKIP() << "Skipping snapshot read; not supported"; } ASSERT_NO_FATAL_FAILURE(SetupCopyOverlap_2()); - ASSERT_NO_FATAL_FAILURE(MergeInterruptAndValidate(2)); + ASSERT_NO_FATAL_FAILURE(MergeInterruptFixed(300)); ValidateMerge(); } @@ -881,11 +968,243 @@ TEST_P(SnapuserdTest, Snapshot_Merge_Crash_Random_Inverted) { ValidateMerge(); } +class SnapuserdVariableBlockSizeTest : public SnapuserdTest { + public: + void SetupCowV3ForVariableBlockSize(); + void ReadSnapshotWithVariableBlockSize(); + + protected: + void SetUp() override; + void TearDown() override; + + void CreateV3CowDeviceForVariableBlockSize(); +}; + +void SnapuserdVariableBlockSizeTest::SetupCowV3ForVariableBlockSize() { + ASSERT_NO_FATAL_FAILURE(CreateBaseDevice()); + ASSERT_NO_FATAL_FAILURE(CreateV3CowDeviceForVariableBlockSize()); + ASSERT_NO_FATAL_FAILURE(SetupDaemon()); +} + +void SnapuserdVariableBlockSizeTest::CreateV3CowDeviceForVariableBlockSize() { + auto writer = CreateV3Cow(); + + size_t total_data_to_write = size_; + + size_t total_blocks_to_write = total_data_to_write / BLOCK_SZ; + size_t num_blocks_per_op = total_blocks_to_write / 4; + size_t source_block = 0; + + size_t seq_len = num_blocks_per_op; + uint32_t sequence[seq_len]; + size_t xor_block_start = seq_len * 3; + for (size_t i = 0; i < seq_len; i++) { + sequence[i] = xor_block_start + i; + } + ASSERT_TRUE(writer->AddSequenceData(seq_len, sequence)); + + size_t total_replace_blocks = num_blocks_per_op; + // Write some data which can be compressed + std::string data; + data.resize(total_replace_blocks * BLOCK_SZ, '\0'); + for (size_t i = 0; i < data.size(); i++) { + data[i] = static_cast('A' + i / BLOCK_SZ); + } + // REPLACE ops + ASSERT_TRUE(writer->AddRawBlocks(source_block, data.data(), data.size())); + + total_blocks_to_write -= total_replace_blocks; + source_block = source_block + total_replace_blocks; + + // ZERO ops + size_t total_zero_blocks = total_blocks_to_write / 3; + ASSERT_TRUE(writer->AddZeroBlocks(source_block, total_zero_blocks)); + + total_blocks_to_write -= total_zero_blocks; + source_block = source_block + total_zero_blocks; + + // Generate some random data wherein few blocks cannot be compressed. + // This is to test the I/O path for those blocks which aren't compressed. + size_t total_random_data_blocks = total_blocks_to_write / 2; + unique_fd rnd_fd(open("/dev/random", O_RDONLY)); + + ASSERT_GE(rnd_fd, 0); + std::string random_buffer; + random_buffer.resize(total_random_data_blocks * BLOCK_SZ, '\0'); + ASSERT_EQ( + android::base::ReadFullyAtOffset(rnd_fd, random_buffer.data(), random_buffer.size(), 0), + true); + // REPLACE ops + ASSERT_TRUE(writer->AddRawBlocks(source_block, random_buffer.data(), random_buffer.size())); + + total_blocks_to_write -= total_random_data_blocks; + source_block = source_block + total_random_data_blocks; + + // XOR ops will always be 4k blocks + std::string xor_buffer; + xor_buffer.resize(total_blocks_to_write * BLOCK_SZ, '\0'); + for (size_t i = 0; i < xor_buffer.size(); i++) { + xor_buffer[i] = static_cast('C' + i / BLOCK_SZ); + } + size_t xor_offset = 21; + std::string source_buffer; + source_buffer.resize(total_blocks_to_write * BLOCK_SZ, '\0'); + ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, source_buffer.data(), source_buffer.size(), + size_ + xor_offset), + true); + for (size_t i = 0; i < xor_buffer.size(); i++) { + xor_buffer[i] ^= source_buffer[i]; + } + + ASSERT_EQ(xor_block_start, source_block); + + ASSERT_TRUE(writer->AddXorBlocks(source_block, xor_buffer.data(), xor_buffer.size(), + (size_ / BLOCK_SZ), xor_offset)); + // Flush operations + ASSERT_TRUE(writer->Finalize()); + + // Construct the buffer required for validation + orig_buffer_ = std::make_unique(total_base_size_); + + // Read the entire base device + ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, orig_buffer_.get(), total_base_size_, 0), + true); + + // REPLACE ops which are compressed + std::memcpy(orig_buffer_.get(), data.data(), data.size()); + size_t offset = data.size(); + + // ZERO ops + std::string zero_buffer(total_zero_blocks * BLOCK_SZ, 0); + std::memcpy((char*)orig_buffer_.get() + offset, (void*)zero_buffer.c_str(), zero_buffer.size()); + offset += zero_buffer.size(); + + // REPLACE ops - Random buffers which aren't compressed + std::memcpy((char*)orig_buffer_.get() + offset, random_buffer.c_str(), random_buffer.size()); + offset += random_buffer.size(); + + // XOR Ops which default to 4k block size compression irrespective of + // compression factor + ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, (char*)orig_buffer_.get() + offset, + xor_buffer.size(), size_ + xor_offset), + true); + for (size_t i = 0; i < xor_buffer.size(); i++) { + orig_buffer_.get()[offset + i] = (uint8_t)(orig_buffer_.get()[offset + i] ^ xor_buffer[i]); + } +} + +void SnapuserdVariableBlockSizeTest::ReadSnapshotWithVariableBlockSize() { + unique_fd fd(open(dmuser_dev_->GetPath().c_str(), O_RDONLY | O_DIRECT)); + ASSERT_GE(fd, 0); + + void* addr; + ssize_t page_size = getpagesize(); + ASSERT_EQ(posix_memalign(&addr, page_size, size_), 0); + std::unique_ptr snapshot_buffer(addr, ::free); + + const TestParam params = GetParam(); + + // Issue I/O request with various block sizes + size_t num_blocks = size_ / params.block_size; + off_t offset = 0; + for (size_t i = 0; i < num_blocks; i++) { + ASSERT_EQ(ReadFullyAtOffset(fd, (char*)snapshot_buffer.get() + offset, params.block_size, + offset), + true); + offset += params.block_size; + } + // Validate buffer + ASSERT_EQ(memcmp(snapshot_buffer.get(), orig_buffer_.get(), size_), 0); + + // Reset the buffer + std::memset(snapshot_buffer.get(), 0, size_); + + // Read one full chunk in a single shot and re-validate. + ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), size_, 0), true); + ASSERT_EQ(memcmp(snapshot_buffer.get(), orig_buffer_.get(), size_), 0); + + // Reset the buffer + std::memset(snapshot_buffer.get(), 0, size_); + + // Buffered I/O test + fd.reset(open(dmuser_dev_->GetPath().c_str(), O_RDONLY)); + ASSERT_GE(fd, 0); + + // Try not to cache + posix_fadvise(fd.get(), 0, size_, POSIX_FADV_DONTNEED); + + size_t num_blocks_per_op = (size_ / BLOCK_SZ) / 4; + offset = num_blocks_per_op * BLOCK_SZ; + size_t read_size = 1019; // bytes + offset -= 111; + + // Issue a un-aligned read which crosses the boundary between a REPLACE block and a ZERO + // block. + ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), read_size, offset), true); + + // Validate the data + ASSERT_EQ(std::memcmp(snapshot_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), 0); + + offset = (num_blocks_per_op * 3) * BLOCK_SZ; + offset -= (BLOCK_SZ - 119); + read_size = 8111; + + // Issue an un-aligned read which crosses the boundary between a REPLACE block of random + // un-compressed data and a XOR block + ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), read_size, offset), true); + + // Validate the data + ASSERT_EQ(std::memcmp(snapshot_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), 0); + + // Reset the buffer + std::memset(snapshot_buffer.get(), 0, size_); + + // Read just one byte at an odd offset which is a REPLACE op + offset = 19; + read_size = 1; + ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), read_size, offset), true); + // Validate the data + ASSERT_EQ(std::memcmp(snapshot_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), 0); + + // Reset the buffer + std::memset(snapshot_buffer.get(), 0, size_); + + // Read a block which has no mapping to a COW operation. This read should be + // a pass-through to the underlying base device. + offset = size_ + 9342; + read_size = 30; + ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), read_size, offset), true); + // Validate the data + ASSERT_EQ(std::memcmp(snapshot_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), 0); +} + +void SnapuserdVariableBlockSizeTest::SetUp() { + ASSERT_NO_FATAL_FAILURE(SnapuserdTest::SetUp()); +} + +void SnapuserdVariableBlockSizeTest::TearDown() { + SnapuserdTest::TearDown(); +} + +TEST_P(SnapuserdVariableBlockSizeTest, Snapshot_Test_Variable_Block_Size) { + if (!harness_->HasUserDevice()) { + GTEST_SKIP() << "Skipping snapshot read; not supported"; + } + ASSERT_NO_FATAL_FAILURE(SetupCowV3ForVariableBlockSize()); + ASSERT_NO_FATAL_FAILURE(ReadSnapshotWithVariableBlockSize()); + ASSERT_TRUE(StartMerge()); + CheckMergeCompletion(); + ValidateMerge(); + ASSERT_NO_FATAL_FAILURE(ReadSnapshotWithVariableBlockSize()); +} + class HandlerTest : public SnapuserdTestBase { protected: void SetUp() override; void TearDown() override; + void SetUpV2Cow(); + void InitializeDevice(); AssertionResult ReadSectors(sector_t sector, uint64_t size, void* buffer); TestBlockServerFactory factory_; @@ -896,10 +1215,11 @@ class HandlerTest : public SnapuserdTestBase { std::future handler_thread_; }; -void HandlerTest::SetUp() { - ASSERT_NO_FATAL_FAILURE(SnapuserdTestBase::SetUp()); - ASSERT_NO_FATAL_FAILURE(CreateBaseDevice()); +void HandlerTest::SetUpV2Cow() { ASSERT_NO_FATAL_FAILURE(CreateCowDevice()); +} + +void HandlerTest::InitializeDevice() { ASSERT_NO_FATAL_FAILURE(SetDeviceControlName()); opener_ = factory_.CreateTestOpener(system_device_ctrl_name_); @@ -921,6 +1241,13 @@ void HandlerTest::SetUp() { handler_thread_ = std::async(std::launch::async, &SnapshotHandler::Start, handler_.get()); } +void HandlerTest::SetUp() { + ASSERT_NO_FATAL_FAILURE(SnapuserdTestBase::SetUp()); + ASSERT_NO_FATAL_FAILURE(CreateBaseDevice()); + ASSERT_NO_FATAL_FAILURE(SetUpV2Cow()); + ASSERT_NO_FATAL_FAILURE(InitializeDevice()); +} + void HandlerTest::TearDown() { ASSERT_TRUE(factory_.DeleteQueue(system_device_ctrl_name_)); ASSERT_TRUE(handler_thread_.get()); @@ -986,6 +1313,147 @@ TEST_P(HandlerTest, ReadUnalignedSize) { ASSERT_EQ(memcmp(snapuserd_buffer.get(), orig_buffer_.get(), SECTOR_SIZE), 0); } +class HandlerTestV3 : public HandlerTest { + public: + void ReadSnapshotWithVariableBlockSize(); + + protected: + void SetUp() override; + void TearDown() override; + void SetUpV3Cow(); +}; + +void HandlerTestV3::SetUp() { + ASSERT_NO_FATAL_FAILURE(SnapuserdTestBase::SetUp()); + ASSERT_NO_FATAL_FAILURE(CreateBaseDevice()); + ASSERT_NO_FATAL_FAILURE(SetUpV3Cow()); + ASSERT_NO_FATAL_FAILURE(InitializeDevice()); +} + +void HandlerTestV3::TearDown() { + ASSERT_NO_FATAL_FAILURE(HandlerTest::TearDown()); +} + +void HandlerTestV3::SetUpV3Cow() { + auto writer = CreateV3Cow(); + + size_t total_data_to_write = size_; + + size_t total_blocks_to_write = total_data_to_write / BLOCK_SZ; + size_t num_blocks_per_op = total_blocks_to_write / 4; + size_t source_block = 0; + + size_t total_replace_blocks = num_blocks_per_op; + // Write some data which can be compressed + std::string data; + data.resize(total_replace_blocks * BLOCK_SZ, '\0'); + for (size_t i = 0; i < data.size(); i++) { + data[i] = static_cast('A' + i / BLOCK_SZ); + } + // REPLACE ops + ASSERT_TRUE(writer->AddRawBlocks(source_block, data.data(), data.size())); + + total_blocks_to_write -= total_replace_blocks; + source_block = source_block + total_replace_blocks; + + // ZERO ops + size_t total_zero_blocks = total_blocks_to_write / 3; + ASSERT_TRUE(writer->AddZeroBlocks(source_block, total_zero_blocks)); + + total_blocks_to_write -= total_zero_blocks; + source_block = source_block + total_zero_blocks; + + // Generate some random data wherein few blocks cannot be compressed. + // This is to test the I/O path for those blocks which aren't compressed. + size_t total_random_data_blocks = total_blocks_to_write; + unique_fd rnd_fd(open("/dev/random", O_RDONLY)); + + ASSERT_GE(rnd_fd, 0); + std::string random_buffer; + random_buffer.resize(total_random_data_blocks * BLOCK_SZ, '\0'); + ASSERT_EQ( + android::base::ReadFullyAtOffset(rnd_fd, random_buffer.data(), random_buffer.size(), 0), + true); + // REPLACE ops + ASSERT_TRUE(writer->AddRawBlocks(source_block, random_buffer.data(), random_buffer.size())); + // Flush operations + ASSERT_TRUE(writer->Finalize()); + + // Construct the buffer required for validation + orig_buffer_ = std::make_unique(total_base_size_); + + // Read the entire base device + ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, orig_buffer_.get(), total_base_size_, 0), + true); + + // REPLACE ops which are compressed + std::memcpy(orig_buffer_.get(), data.data(), data.size()); + size_t offset = data.size(); + + // ZERO ops + std::string zero_buffer(total_zero_blocks * BLOCK_SZ, 0); + std::memcpy((char*)orig_buffer_.get() + offset, (void*)zero_buffer.c_str(), zero_buffer.size()); + offset += zero_buffer.size(); + + // REPLACE ops - Random buffers which aren't compressed + std::memcpy((char*)orig_buffer_.get() + offset, random_buffer.c_str(), random_buffer.size()); +} + +TEST_P(HandlerTestV3, Read) { + std::unique_ptr snapuserd_buffer = std::make_unique(size_); + + size_t read_size = SECTOR_SIZE; + off_t offset = 0; + // Read the first sector + ASSERT_TRUE(ReadSectors(1, read_size, snapuserd_buffer.get())); + // Validate the data + ASSERT_EQ(std::memcmp(snapuserd_buffer.get(), orig_buffer_.get(), read_size), 0); + + // Read the second block at offset 7680 (Sector 15). This will map to the + // first COW operation for variable block size + offset += (((BLOCK_SZ * 2) - SECTOR_SIZE)); + read_size = BLOCK_SZ; // Span across two REPLACE ops + ASSERT_TRUE(ReadSectors(offset / SECTOR_SIZE, read_size, snapuserd_buffer.get())); + // Validate the data + ASSERT_EQ(std::memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), + 0); + + // Fill some other data since we are going to read zero blocks + std::memset(snapuserd_buffer.get(), 'Z', size_); + + size_t num_blocks_per_op = (size_ / BLOCK_SZ) / 4; + offset = num_blocks_per_op * BLOCK_SZ; + // Issue read spanning between a REPLACE op and ZERO ops. The starting point + // is the last REPLACE op at sector 5118 + offset -= (SECTOR_SIZE * 2); + // This will make sure it falls back to aligned reads after reading the + // first unaligned block + read_size = BLOCK_SZ * 6; + ASSERT_TRUE(ReadSectors(offset / SECTOR_SIZE, read_size, snapuserd_buffer.get())); + // Validate the data + ASSERT_EQ(std::memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), + 0); + + // Issue I/O request at the last block. The first chunk of (SECTOR_SIZE * 2) + // will be from REPLACE op which has random buffers + offset = (size_ - (SECTOR_SIZE * 2)); + // Request will span beyond the COW mapping, thereby fetching data from base + // device. + read_size = BLOCK_SZ * 8; + ASSERT_TRUE(ReadSectors(offset / SECTOR_SIZE, read_size, snapuserd_buffer.get())); + // Validate the data + ASSERT_EQ(std::memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), + 0); + + // Issue I/O request which are not mapped to any COW operations + offset = (size_ + (SECTOR_SIZE * 3)); + read_size = BLOCK_SZ * 3; + ASSERT_TRUE(ReadSectors(offset / SECTOR_SIZE, read_size, snapuserd_buffer.get())); + // Validate the data + ASSERT_EQ(std::memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), + 0); +} + std::vector GetIoUringConfigs() { #if __ANDROID__ if (!android::base::GetBoolProperty("ro.virtual_ab.io_uring.enabled", false)) { @@ -1018,6 +1486,37 @@ std::vector GetTestConfigs() { return testParams; } +std::vector GetVariableBlockTestConfigs() { + std::vector testParams; + + std::vector block_sizes = {4096, 8192, 16384, 32768, 65536, 131072}; + std::vector compression_algo = {"none", "lz4", "zstd", "gz"}; + std::vector threads = {1, 2}; + std::vector uring_configs = GetIoUringConfigs(); + + // This should test 96 combination and validates the I/O path + for (auto block : block_sizes) { + for (auto compression : compression_algo) { + for (auto thread : threads) { + for (auto io_uring : uring_configs) { + TestParam param; + param.block_size = block; + param.compression = compression; + param.num_threads = thread; + param.io_uring = io_uring; + param.o_direct = false; + testParams.push_back(std::move(param)); + } + } + } + } + + return testParams; +} + +INSTANTIATE_TEST_SUITE_P(Io, SnapuserdVariableBlockSizeTest, + ::testing::ValuesIn(GetVariableBlockTestConfigs())); +INSTANTIATE_TEST_SUITE_P(Io, HandlerTestV3, ::testing::ValuesIn(GetVariableBlockTestConfigs())); INSTANTIATE_TEST_SUITE_P(Io, SnapuserdTest, ::testing::ValuesIn(GetTestConfigs())); INSTANTIATE_TEST_SUITE_P(Io, HandlerTest, ::testing::ValuesIn(GetTestConfigs()));