Merge changes from topic "ota-block-size-compression" into main
* changes: snapuserd: Add I/O path support for variable block size libsnapshot_cow: Support multi-block compression
This commit is contained in:
commit
942b213628
17 changed files with 1341 additions and 158 deletions
|
@ -108,7 +108,7 @@ cc_library_static {
|
|||
],
|
||||
srcs: [":libsnapshot_sources"],
|
||||
static_libs: [
|
||||
"libfs_mgr_binder"
|
||||
"libfs_mgr_binder",
|
||||
],
|
||||
}
|
||||
|
||||
|
@ -128,12 +128,12 @@ cc_library {
|
|||
static_libs: [
|
||||
"libc++fs",
|
||||
"libsnapshot_cow",
|
||||
]
|
||||
],
|
||||
}
|
||||
|
||||
cc_library_static {
|
||||
name: "libsnapshot_init",
|
||||
native_coverage : true,
|
||||
native_coverage: true,
|
||||
defaults: ["libsnapshot_defaults"],
|
||||
srcs: [":libsnapshot_sources"],
|
||||
ramdisk_available: true,
|
||||
|
@ -204,6 +204,10 @@ cc_library_static {
|
|||
"libsnapshot_cow/writer_v2.cpp",
|
||||
"libsnapshot_cow/writer_v3.cpp",
|
||||
],
|
||||
|
||||
header_libs: [
|
||||
"libstorage_literals_headers",
|
||||
],
|
||||
export_include_dirs: ["include"],
|
||||
host_supported: true,
|
||||
recovery_available: true,
|
||||
|
@ -243,7 +247,10 @@ cc_library_static {
|
|||
|
||||
cc_defaults {
|
||||
name: "libsnapshot_test_defaults",
|
||||
defaults: ["libsnapshot_defaults", "libsnapshot_cow_defaults"],
|
||||
defaults: [
|
||||
"libsnapshot_defaults",
|
||||
"libsnapshot_cow_defaults",
|
||||
],
|
||||
srcs: [
|
||||
"partition_cow_creator_test.cpp",
|
||||
"snapshot_metadata_updater_test.cpp",
|
||||
|
@ -283,10 +290,13 @@ cc_defaults {
|
|||
|
||||
cc_test {
|
||||
name: "vts_libsnapshot_test",
|
||||
defaults: ["libsnapshot_test_defaults", "libsnapshot_hal_deps"],
|
||||
defaults: [
|
||||
"libsnapshot_test_defaults",
|
||||
"libsnapshot_hal_deps",
|
||||
],
|
||||
test_suites: [
|
||||
"vts",
|
||||
"device-tests"
|
||||
"device-tests",
|
||||
],
|
||||
test_options: {
|
||||
min_shipping_api_level: 30,
|
||||
|
@ -295,12 +305,15 @@ cc_test {
|
|||
|
||||
cc_test {
|
||||
name: "vab_legacy_tests",
|
||||
defaults: ["libsnapshot_test_defaults", "libsnapshot_hal_deps"],
|
||||
defaults: [
|
||||
"libsnapshot_test_defaults",
|
||||
"libsnapshot_hal_deps",
|
||||
],
|
||||
cppflags: [
|
||||
"-DLIBSNAPSHOT_TEST_VAB_LEGACY",
|
||||
],
|
||||
test_suites: [
|
||||
"device-tests"
|
||||
"device-tests",
|
||||
],
|
||||
test_options: {
|
||||
// Legacy VAB launched in Android R.
|
||||
|
@ -310,12 +323,15 @@ cc_test {
|
|||
|
||||
cc_test {
|
||||
name: "vabc_legacy_tests",
|
||||
defaults: ["libsnapshot_test_defaults", "libsnapshot_hal_deps"],
|
||||
defaults: [
|
||||
"libsnapshot_test_defaults",
|
||||
"libsnapshot_hal_deps",
|
||||
],
|
||||
cppflags: [
|
||||
"-DLIBSNAPSHOT_TEST_VABC_LEGACY",
|
||||
],
|
||||
test_suites: [
|
||||
"device-tests"
|
||||
"device-tests",
|
||||
],
|
||||
test_options: {
|
||||
// Legacy VABC launched in Android S.
|
||||
|
@ -343,7 +359,10 @@ cc_test {
|
|||
|
||||
cc_binary {
|
||||
name: "snapshotctl",
|
||||
defaults: ["libsnapshot_cow_defaults", "libsnapshot_hal_deps"],
|
||||
defaults: [
|
||||
"libsnapshot_cow_defaults",
|
||||
"libsnapshot_hal_deps",
|
||||
],
|
||||
srcs: [
|
||||
"snapshotctl.cpp",
|
||||
],
|
||||
|
@ -412,8 +431,11 @@ cc_test {
|
|||
"libgtest",
|
||||
"libsnapshot_cow",
|
||||
],
|
||||
header_libs: [
|
||||
"libstorage_literals_headers",
|
||||
],
|
||||
test_suites: [
|
||||
"device-tests"
|
||||
"device-tests",
|
||||
],
|
||||
test_options: {
|
||||
min_shipping_api_level: 30,
|
||||
|
|
|
@ -201,6 +201,12 @@ static constexpr uint64_t kCowOpSourceInfoDataMask = (1ULL << 48) - 1;
|
|||
static constexpr uint64_t kCowOpSourceInfoTypeBit = 60;
|
||||
static constexpr uint64_t kCowOpSourceInfoTypeNumBits = 4;
|
||||
static constexpr uint64_t kCowOpSourceInfoTypeMask = (1ULL << kCowOpSourceInfoTypeNumBits) - 1;
|
||||
|
||||
static constexpr uint64_t kCowOpSourceInfoCompressionBit = 57;
|
||||
static constexpr uint64_t kCowOpSourceInfoCompressionNumBits = 3;
|
||||
static constexpr uint64_t kCowOpSourceInfoCompressionMask =
|
||||
((1ULL << kCowOpSourceInfoCompressionNumBits) - 1);
|
||||
|
||||
// The on disk format of cow (currently == CowOperation)
|
||||
struct CowOperationV3 {
|
||||
// If this operation reads from the data section of the COW, this contains
|
||||
|
@ -211,8 +217,8 @@ struct CowOperationV3 {
|
|||
uint32_t new_block;
|
||||
|
||||
// source_info with have the following layout
|
||||
// |---4 bits ---| ---12 bits---| --- 48 bits ---|
|
||||
// |--- type --- | -- unused -- | --- source --- |
|
||||
// |--- 4 bits -- | --------- 3 bits ------ | --- 9 bits --- | --- 48 bits ---|
|
||||
// |--- type --- | -- compression factor --| --- unused --- | --- source --- |
|
||||
//
|
||||
// The value of |source| depends on the operation code.
|
||||
//
|
||||
|
@ -225,6 +231,17 @@ struct CowOperationV3 {
|
|||
// For ops other than Label:
|
||||
// Bits 47-62 are reserved and must be zero.
|
||||
// A block is compressed if it’s data is < block_sz
|
||||
//
|
||||
// Bits [57-59] represents the compression factor.
|
||||
//
|
||||
// Compression - factor
|
||||
// ==========================
|
||||
// 000 - 4k
|
||||
// 001 - 8k
|
||||
// 010 - 16k
|
||||
// ...
|
||||
// 110 - 256k
|
||||
//
|
||||
uint64_t source_info_;
|
||||
constexpr uint64_t source() const { return source_info_ & kCowOpSourceInfoDataMask; }
|
||||
constexpr void set_source(uint64_t source) {
|
||||
|
@ -245,6 +262,20 @@ struct CowOperationV3 {
|
|||
source_info_ |= (static_cast<uint64_t>(type) & kCowOpSourceInfoTypeMask)
|
||||
<< kCowOpSourceInfoTypeBit;
|
||||
}
|
||||
constexpr void set_compression_bits(uint8_t compression_factor) {
|
||||
// Clear the 3 bits from bit 57 - [57-59]
|
||||
source_info_ &= ~(kCowOpSourceInfoCompressionMask << kCowOpSourceInfoCompressionBit);
|
||||
// Set the actual compression factor
|
||||
source_info_ |=
|
||||
(static_cast<uint64_t>(compression_factor) & kCowOpSourceInfoCompressionMask)
|
||||
<< kCowOpSourceInfoCompressionBit;
|
||||
}
|
||||
constexpr uint8_t compression_bits() const {
|
||||
// Grab the 3 bits from [57-59]
|
||||
const auto compression_factor =
|
||||
(source_info_ >> kCowOpSourceInfoCompressionBit) & kCowOpSourceInfoCompressionMask;
|
||||
return static_cast<uint8_t>(compression_factor);
|
||||
}
|
||||
} __attribute__((packed));
|
||||
|
||||
// Ensure that getters/setters added to CowOperationV3 does not increases size
|
||||
|
@ -326,5 +357,11 @@ bool IsOrderedOp(const CowOperation& op);
|
|||
// Convert compression name to internal value.
|
||||
std::optional<CowCompressionAlgorithm> CompressionAlgorithmFromString(std::string_view name);
|
||||
|
||||
// Return block size used for compression
|
||||
size_t CowOpCompressionSize(const CowOperation* op, size_t block_size);
|
||||
|
||||
// Return the relative offset of the I/O block which the CowOperation
|
||||
// multi-block compression
|
||||
bool GetBlockOffset(const CowOperation* op, uint64_t io_block, size_t block_size, off_t* offset);
|
||||
} // namespace snapshot
|
||||
} // namespace android
|
||||
|
|
|
@ -162,6 +162,9 @@ class CowReader final : public ICowReader {
|
|||
// Creates a clone of the current CowReader without the file handlers
|
||||
std::unique_ptr<CowReader> CloneCowReader();
|
||||
|
||||
// Get the max compression size
|
||||
uint32_t GetMaxCompressionSize();
|
||||
|
||||
void UpdateMergeOpsCompleted(int num_merge_ops) { header_.num_merge_ops += num_merge_ops; }
|
||||
|
||||
private:
|
||||
|
|
|
@ -119,9 +119,9 @@ class ICowWriter {
|
|||
|
||||
class CompressWorker {
|
||||
public:
|
||||
CompressWorker(std::unique_ptr<ICompressor>&& compressor, uint32_t block_size);
|
||||
CompressWorker(std::unique_ptr<ICompressor>&& compressor);
|
||||
bool RunThread();
|
||||
void EnqueueCompressBlocks(const void* buffer, size_t num_blocks);
|
||||
void EnqueueCompressBlocks(const void* buffer, size_t block_size, size_t num_blocks);
|
||||
bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf);
|
||||
void Finalize();
|
||||
static uint32_t GetDefaultCompressionLevel(CowCompressionAlgorithm compression);
|
||||
|
@ -134,20 +134,22 @@ class CompressWorker {
|
|||
struct CompressWork {
|
||||
const void* buffer;
|
||||
size_t num_blocks;
|
||||
size_t block_size;
|
||||
bool compression_status = false;
|
||||
std::vector<std::basic_string<uint8_t>> compressed_data;
|
||||
};
|
||||
|
||||
std::unique_ptr<ICompressor> compressor_;
|
||||
uint32_t block_size_;
|
||||
|
||||
std::queue<CompressWork> work_queue_;
|
||||
std::queue<CompressWork> compressed_queue_;
|
||||
std::mutex lock_;
|
||||
std::condition_variable cv_;
|
||||
bool stopped_ = false;
|
||||
size_t total_submitted_ = 0;
|
||||
size_t total_processed_ = 0;
|
||||
|
||||
bool CompressBlocks(const void* buffer, size_t num_blocks,
|
||||
bool CompressBlocks(const void* buffer, size_t num_blocks, size_t block_size,
|
||||
std::vector<std::basic_string<uint8_t>>* compressed_data);
|
||||
};
|
||||
|
||||
|
|
|
@ -208,9 +208,9 @@ class ZstdCompressor final : public ICompressor {
|
|||
std::unique_ptr<ZSTD_CCtx, decltype(&ZSTD_freeCCtx)> zstd_context_;
|
||||
};
|
||||
|
||||
bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,
|
||||
bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks, size_t block_size,
|
||||
std::vector<std::basic_string<uint8_t>>* compressed_data) {
|
||||
return CompressBlocks(compressor_.get(), block_size_, buffer, num_blocks, compressed_data);
|
||||
return CompressBlocks(compressor_.get(), block_size, buffer, num_blocks, compressed_data);
|
||||
}
|
||||
|
||||
bool CompressWorker::CompressBlocks(ICompressor* compressor, size_t block_size, const void* buffer,
|
||||
|
@ -223,7 +223,7 @@ bool CompressWorker::CompressBlocks(ICompressor* compressor, size_t block_size,
|
|||
PLOG(ERROR) << "CompressBlocks: Compression failed";
|
||||
return false;
|
||||
}
|
||||
if (data.size() > std::numeric_limits<uint16_t>::max()) {
|
||||
if (data.size() > std::numeric_limits<uint32_t>::max()) {
|
||||
LOG(ERROR) << "Compressed block is too large: " << data.size();
|
||||
return false;
|
||||
}
|
||||
|
@ -254,7 +254,8 @@ bool CompressWorker::RunThread() {
|
|||
}
|
||||
|
||||
// Compress blocks
|
||||
bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, &blocks.compressed_data);
|
||||
bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, blocks.block_size,
|
||||
&blocks.compressed_data);
|
||||
blocks.compression_status = ret;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
|
@ -273,35 +274,31 @@ bool CompressWorker::RunThread() {
|
|||
return true;
|
||||
}
|
||||
|
||||
void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t num_blocks) {
|
||||
void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t block_size,
|
||||
size_t num_blocks) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
|
||||
CompressWork blocks = {};
|
||||
blocks.buffer = buffer;
|
||||
blocks.block_size = block_size;
|
||||
blocks.num_blocks = num_blocks;
|
||||
work_queue_.push(std::move(blocks));
|
||||
total_submitted_ += 1;
|
||||
}
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
bool CompressWorker::GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf) {
|
||||
{
|
||||
while (true) {
|
||||
std::unique_lock<std::mutex> lock(lock_);
|
||||
while (compressed_queue_.empty() && !stopped_) {
|
||||
while ((total_submitted_ != total_processed_) && compressed_queue_.empty() && !stopped_) {
|
||||
cv_.wait(lock);
|
||||
}
|
||||
|
||||
if (stopped_) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
while (compressed_queue_.size() > 0) {
|
||||
CompressWork blocks = std::move(compressed_queue_.front());
|
||||
compressed_queue_.pop();
|
||||
total_processed_ += 1;
|
||||
|
||||
if (blocks.compression_status) {
|
||||
compressed_buf->insert(compressed_buf->end(),
|
||||
|
@ -312,9 +309,12 @@ bool CompressWorker::GetCompressedBuffers(std::vector<std::basic_string<uint8_t>
|
|||
return false;
|
||||
}
|
||||
}
|
||||
if ((total_submitted_ == total_processed_) || stopped_) {
|
||||
total_submitted_ = 0;
|
||||
total_processed_ = 0;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::unique_ptr<ICompressor> ICompressor::Brotli(uint32_t compression_level,
|
||||
|
@ -344,8 +344,8 @@ void CompressWorker::Finalize() {
|
|||
cv_.notify_all();
|
||||
}
|
||||
|
||||
CompressWorker::CompressWorker(std::unique_ptr<ICompressor>&& compressor, uint32_t block_size)
|
||||
: compressor_(std::move(compressor)), block_size_(block_size) {}
|
||||
CompressWorker::CompressWorker(std::unique_ptr<ICompressor>&& compressor)
|
||||
: compressor_(std::move(compressor)) {}
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace android
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include <android-base/logging.h>
|
||||
#include <android-base/stringprintf.h>
|
||||
#include <libsnapshot/cow_format.h>
|
||||
#include <storage_literals/storage_literals.h>
|
||||
#include "writer_v2.h"
|
||||
#include "writer_v3.h"
|
||||
|
||||
|
@ -28,6 +29,7 @@ namespace android {
|
|||
namespace snapshot {
|
||||
|
||||
using android::base::unique_fd;
|
||||
using namespace android::storage_literals;
|
||||
|
||||
std::ostream& EmitCowTypeString(std::ostream& os, CowOperationType cow_type) {
|
||||
switch (cow_type) {
|
||||
|
@ -174,5 +176,36 @@ std::unique_ptr<ICowWriter> CreateCowEstimator(uint32_t version, const CowOption
|
|||
return CreateCowWriter(version, options, unique_fd{-1}, std::nullopt);
|
||||
}
|
||||
|
||||
size_t CowOpCompressionSize(const CowOperation* op, size_t block_size) {
|
||||
uint8_t compression_bits = op->compression_bits();
|
||||
return (block_size << compression_bits);
|
||||
}
|
||||
|
||||
bool GetBlockOffset(const CowOperation* op, uint64_t io_block, size_t block_size, off_t* offset) {
|
||||
const uint64_t new_block = op->new_block;
|
||||
|
||||
if (op->type() != kCowReplaceOp || io_block < new_block) {
|
||||
LOG(VERBOSE) << "Invalid IO request for block: " << io_block
|
||||
<< " CowOperation: new_block: " << new_block;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Get the actual compression size
|
||||
const size_t compression_size = CowOpCompressionSize(op, block_size);
|
||||
// Find the number of blocks spanned
|
||||
const size_t num_blocks = compression_size / block_size;
|
||||
// Find the distance of the I/O block which this
|
||||
// CowOperation encompasses
|
||||
const size_t block_distance = io_block - new_block;
|
||||
// Check if this block is within this range;
|
||||
// if so, return the relative offset
|
||||
if (block_distance < num_blocks) {
|
||||
*offset = block_distance * block_size;
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace android
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include <android-base/logging.h>
|
||||
#include <libsnapshot/cow_format.h>
|
||||
#include <libsnapshot/cow_reader.h>
|
||||
#include <storage_literals/storage_literals.h>
|
||||
#include <zlib.h>
|
||||
|
||||
#include "cow_decompress.h"
|
||||
|
@ -35,6 +36,8 @@
|
|||
namespace android {
|
||||
namespace snapshot {
|
||||
|
||||
using namespace android::storage_literals;
|
||||
|
||||
bool ReadCowHeader(android::base::borrowed_fd fd, CowHeaderV3* header) {
|
||||
if (lseek(fd.get(), 0, SEEK_SET) < 0) {
|
||||
PLOG(ERROR) << "lseek header failed";
|
||||
|
@ -161,6 +164,21 @@ bool CowReader::Parse(android::base::borrowed_fd fd, std::optional<uint64_t> lab
|
|||
return PrepMergeOps();
|
||||
}
|
||||
|
||||
uint32_t CowReader::GetMaxCompressionSize() {
|
||||
switch (header_.prefix.major_version) {
|
||||
case 1:
|
||||
case 2:
|
||||
// Old versions supports only 4KB compression.
|
||||
return header_.block_size;
|
||||
;
|
||||
case 3:
|
||||
return header_.max_compression_size;
|
||||
default:
|
||||
LOG(ERROR) << "Unknown version: " << header_.prefix.major_version;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// This sets up the data needed for MergeOpIter. MergeOpIter presents
|
||||
// data in the order we intend to merge in.
|
||||
|
@ -705,6 +723,11 @@ uint8_t CowReader::GetCompressionType() {
|
|||
ssize_t CowReader::ReadData(const CowOperation* op, void* buffer, size_t buffer_size,
|
||||
size_t ignore_bytes) {
|
||||
std::unique_ptr<IDecompressor> decompressor;
|
||||
const size_t op_buf_size = CowOpCompressionSize(op, header_.block_size);
|
||||
if (!op_buf_size) {
|
||||
LOG(ERROR) << "Compression size is zero. op: " << *op;
|
||||
return -1;
|
||||
}
|
||||
switch (GetCompressionType()) {
|
||||
case kCowCompressNone:
|
||||
break;
|
||||
|
@ -715,12 +738,12 @@ ssize_t CowReader::ReadData(const CowOperation* op, void* buffer, size_t buffer_
|
|||
decompressor = IDecompressor::Brotli();
|
||||
break;
|
||||
case kCowCompressZstd:
|
||||
if (header_.block_size != op->data_length) {
|
||||
if (op_buf_size != op->data_length) {
|
||||
decompressor = IDecompressor::Zstd();
|
||||
}
|
||||
break;
|
||||
case kCowCompressLz4:
|
||||
if (header_.block_size != op->data_length) {
|
||||
if (op_buf_size != op->data_length) {
|
||||
decompressor = IDecompressor::Lz4();
|
||||
}
|
||||
break;
|
||||
|
@ -736,14 +759,14 @@ ssize_t CowReader::ReadData(const CowOperation* op, void* buffer, size_t buffer_
|
|||
offset = op->source();
|
||||
}
|
||||
if (!decompressor ||
|
||||
((op->data_length == header_.block_size) && (header_.prefix.major_version == 3))) {
|
||||
((op->data_length == op_buf_size) && (header_.prefix.major_version == 3))) {
|
||||
CowDataStream stream(this, offset + ignore_bytes, op->data_length - ignore_bytes);
|
||||
return stream.ReadFully(buffer, buffer_size);
|
||||
}
|
||||
|
||||
CowDataStream stream(this, offset, op->data_length);
|
||||
decompressor->set_stream(&stream);
|
||||
return decompressor->Decompress(buffer, buffer_size, header_.block_size, ignore_bytes);
|
||||
return decompressor->Decompress(buffer, buffer_size, op_buf_size, ignore_bytes);
|
||||
}
|
||||
|
||||
bool CowReader::GetSourceOffset(const CowOperation* op, uint64_t* source_offset) {
|
||||
|
|
|
@ -206,6 +206,8 @@ bool CowParserV2::Translate(TranslatedCowOps* out) {
|
|||
|
||||
auto& new_op = out->ops->at(i);
|
||||
new_op.set_type(v2_op.type);
|
||||
// v2 ops always have 4k compression
|
||||
new_op.set_compression_bits(0);
|
||||
new_op.data_length = v2_op.data_length;
|
||||
|
||||
if (v2_op.new_block > std::numeric_limits<uint32_t>::max()) {
|
||||
|
|
|
@ -42,10 +42,21 @@ CompressedSnapshotReader::CompressedSnapshotReader(std::unique_ptr<ICowReader>&&
|
|||
op_iter_->Next();
|
||||
continue;
|
||||
}
|
||||
if (op->new_block >= ops_.size()) {
|
||||
ops_.resize(op->new_block + 1, nullptr);
|
||||
|
||||
size_t num_blocks = 1;
|
||||
if (op->type() == kCowReplaceOp) {
|
||||
num_blocks = (CowOpCompressionSize(op, block_size_) / block_size_);
|
||||
}
|
||||
if (op->new_block >= ops_.size()) {
|
||||
ops_.resize(op->new_block + num_blocks, nullptr);
|
||||
}
|
||||
|
||||
size_t vec_index = op->new_block;
|
||||
while (num_blocks) {
|
||||
ops_[vec_index] = op;
|
||||
num_blocks -= 1;
|
||||
vec_index += 1;
|
||||
}
|
||||
ops_[op->new_block] = op;
|
||||
op_iter_->Next();
|
||||
}
|
||||
}
|
||||
|
@ -172,11 +183,20 @@ ssize_t CompressedSnapshotReader::ReadBlock(uint64_t chunk, size_t start_offset,
|
|||
} else if (op->type() == kCowZeroOp) {
|
||||
memset(buffer, 0, bytes_to_read);
|
||||
} else if (op->type() == kCowReplaceOp) {
|
||||
if (cow_->ReadData(op, buffer, bytes_to_read, start_offset) < bytes_to_read) {
|
||||
LOG(ERROR) << "CompressedSnapshotReader failed to read replace op";
|
||||
size_t buffer_size = CowOpCompressionSize(op, block_size_);
|
||||
uint8_t temp_buffer[buffer_size];
|
||||
if (cow_->ReadData(op, temp_buffer, buffer_size, 0) < buffer_size) {
|
||||
LOG(ERROR) << "CompressedSnapshotReader failed to read replace op: buffer_size: "
|
||||
<< buffer_size << "start_offset: " << start_offset;
|
||||
errno = EIO;
|
||||
return -1;
|
||||
}
|
||||
off_t block_offset{};
|
||||
if (!GetBlockOffset(op, chunk, block_size_, &block_offset)) {
|
||||
LOG(ERROR) << "GetBlockOffset failed";
|
||||
return -1;
|
||||
}
|
||||
std::memcpy(buffer, (char*)temp_buffer + block_offset + start_offset, bytes_to_read);
|
||||
} else if (op->type() == kCowXorOp) {
|
||||
borrowed_fd fd = GetSourceFd();
|
||||
if (fd < 0) {
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include <libsnapshot/cow_format.h>
|
||||
#include <libsnapshot/cow_reader.h>
|
||||
#include <libsnapshot/cow_writer.h>
|
||||
#include <storage_literals/storage_literals.h>
|
||||
#include "writer_v2.h"
|
||||
#include "writer_v3.h"
|
||||
|
||||
|
@ -29,6 +30,9 @@ using testing::AssertionSuccess;
|
|||
namespace android {
|
||||
namespace snapshot {
|
||||
|
||||
using namespace android::storage_literals;
|
||||
using ::testing::TestWithParam;
|
||||
|
||||
class CowTestV3 : public ::testing::Test {
|
||||
protected:
|
||||
virtual void SetUp() override {
|
||||
|
@ -484,14 +488,14 @@ TEST_F(CowTestV3, BufferMetadataSyncTest) {
|
|||
ASSERT_TRUE(reader.Parse(cow_->fd));
|
||||
|
||||
auto header = reader.header_v3();
|
||||
ASSERT_EQ(header.sequence_data_count, 0);
|
||||
ASSERT_EQ(header.sequence_data_count, static_cast<uint64_t>(0));
|
||||
ASSERT_EQ(header.resume_point_count, 0);
|
||||
ASSERT_EQ(header.resume_point_max, 4);
|
||||
|
||||
writer->AddLabel(0);
|
||||
ASSERT_TRUE(reader.Parse(cow_->fd));
|
||||
header = reader.header_v3();
|
||||
ASSERT_EQ(header.sequence_data_count, 0);
|
||||
ASSERT_EQ(header.sequence_data_count, static_cast<uint64_t>(0));
|
||||
ASSERT_EQ(header.resume_point_count, 1);
|
||||
ASSERT_EQ(header.resume_point_max, 4);
|
||||
|
||||
|
@ -699,5 +703,189 @@ TEST_F(CowTestV3, CheckOpCount) {
|
|||
ASSERT_FALSE(writer->AddZeroBlocks(0, 19));
|
||||
}
|
||||
|
||||
struct TestParam {
|
||||
std::string compression;
|
||||
int block_size;
|
||||
int num_threads;
|
||||
size_t cluster_ops;
|
||||
};
|
||||
|
||||
class VariableBlockTest : public ::testing::TestWithParam<TestParam> {
|
||||
protected:
|
||||
virtual void SetUp() override {
|
||||
cow_ = std::make_unique<TemporaryFile>();
|
||||
ASSERT_GE(cow_->fd, 0) << strerror(errno);
|
||||
}
|
||||
|
||||
virtual void TearDown() override { cow_ = nullptr; }
|
||||
|
||||
unique_fd GetCowFd() { return unique_fd{dup(cow_->fd)}; }
|
||||
|
||||
std::unique_ptr<TemporaryFile> cow_;
|
||||
};
|
||||
|
||||
// Helper to check read sizes.
|
||||
static inline void ReadBlockData(CowReader& reader, const CowOperation* op, void* buffer,
|
||||
size_t size) {
|
||||
size_t block_size = CowOpCompressionSize(op, 4096);
|
||||
std::string data(block_size, '\0');
|
||||
size_t value = reader.ReadData(op, data.data(), block_size);
|
||||
ASSERT_TRUE(value == block_size);
|
||||
std::memcpy(buffer, data.data(), size);
|
||||
}
|
||||
|
||||
TEST_P(VariableBlockTest, VariableBlockCompressionTest) {
|
||||
const TestParam params = GetParam();
|
||||
|
||||
CowOptions options;
|
||||
options.op_count_max = 100000;
|
||||
options.compression = params.compression;
|
||||
options.num_compress_threads = params.num_threads;
|
||||
options.batch_write = true;
|
||||
options.compression_factor = params.block_size;
|
||||
options.cluster_ops = params.cluster_ops;
|
||||
|
||||
CowWriterV3 writer(options, GetCowFd());
|
||||
|
||||
ASSERT_TRUE(writer.Initialize());
|
||||
|
||||
std::string xor_data = "This is test data-1. Testing xor";
|
||||
xor_data.resize(options.block_size, '\0');
|
||||
ASSERT_TRUE(writer.AddXorBlocks(50, xor_data.data(), xor_data.size(), 24, 10));
|
||||
|
||||
// Large number of blocks
|
||||
std::string data = "This is test data-2. Testing replace ops";
|
||||
data.resize(options.block_size * 2048, '\0');
|
||||
ASSERT_TRUE(writer.AddRawBlocks(100, data.data(), data.size()));
|
||||
|
||||
std::string data2 = "This is test data-3. Testing replace ops";
|
||||
data2.resize(options.block_size * 259, '\0');
|
||||
ASSERT_TRUE(writer.AddRawBlocks(6000, data2.data(), data2.size()));
|
||||
|
||||
// Test data size is smaller than block size
|
||||
|
||||
// 4k block
|
||||
std::string data3 = "This is test data-4. Testing replace ops";
|
||||
data3.resize(options.block_size, '\0');
|
||||
ASSERT_TRUE(writer.AddRawBlocks(9000, data3.data(), data3.size()));
|
||||
|
||||
// 8k block
|
||||
std::string data4;
|
||||
data4.resize(options.block_size * 2, '\0');
|
||||
for (size_t i = 0; i < data4.size(); i++) {
|
||||
data4[i] = static_cast<char>('A' + i / options.block_size);
|
||||
}
|
||||
ASSERT_TRUE(writer.AddRawBlocks(10000, data4.data(), data4.size()));
|
||||
|
||||
// 16k block
|
||||
std::string data5;
|
||||
data.resize(options.block_size * 4, '\0');
|
||||
for (int i = 0; i < data5.size(); i++) {
|
||||
data5[i] = static_cast<char>('C' + i / options.block_size);
|
||||
}
|
||||
ASSERT_TRUE(writer.AddRawBlocks(11000, data5.data(), data5.size()));
|
||||
|
||||
// 64k Random buffer which cannot be compressed
|
||||
unique_fd rnd_fd(open("/dev/random", O_RDONLY));
|
||||
ASSERT_GE(rnd_fd, 0);
|
||||
std::string random_buffer;
|
||||
random_buffer.resize(65536, '\0');
|
||||
ASSERT_EQ(android::base::ReadFullyAtOffset(rnd_fd, random_buffer.data(), 65536, 0), true);
|
||||
ASSERT_TRUE(writer.AddRawBlocks(12000, random_buffer.data(), 65536));
|
||||
|
||||
ASSERT_TRUE(writer.Finalize());
|
||||
|
||||
ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
|
||||
|
||||
CowReader reader;
|
||||
ASSERT_TRUE(reader.Parse(cow_->fd));
|
||||
|
||||
auto iter = reader.GetOpIter();
|
||||
ASSERT_NE(iter, nullptr);
|
||||
|
||||
while (!iter->AtEnd()) {
|
||||
auto op = iter->Get();
|
||||
|
||||
if (op->type() == kCowXorOp) {
|
||||
std::string sink(xor_data.size(), '\0');
|
||||
ASSERT_EQ(op->new_block, 50);
|
||||
ASSERT_EQ(op->source(), 98314); // 4096 * 24 + 10
|
||||
ReadBlockData(reader, op, sink.data(), sink.size());
|
||||
ASSERT_EQ(sink, xor_data);
|
||||
}
|
||||
if (op->type() == kCowReplaceOp) {
|
||||
if (op->new_block == 100) {
|
||||
data.resize(options.block_size);
|
||||
std::string sink(data.size(), '\0');
|
||||
ReadBlockData(reader, op, sink.data(), sink.size());
|
||||
ASSERT_EQ(sink.size(), data.size());
|
||||
ASSERT_EQ(sink, data);
|
||||
}
|
||||
if (op->new_block == 6000) {
|
||||
data2.resize(options.block_size);
|
||||
std::string sink(data2.size(), '\0');
|
||||
ReadBlockData(reader, op, sink.data(), sink.size());
|
||||
ASSERT_EQ(sink, data2);
|
||||
}
|
||||
if (op->new_block == 9000) {
|
||||
std::string sink(data3.size(), '\0');
|
||||
ReadBlockData(reader, op, sink.data(), sink.size());
|
||||
ASSERT_EQ(sink, data3);
|
||||
}
|
||||
if (op->new_block == 10000) {
|
||||
data4.resize(options.block_size);
|
||||
std::string sink(options.block_size, '\0');
|
||||
ReadBlockData(reader, op, sink.data(), sink.size());
|
||||
ASSERT_EQ(sink, data4);
|
||||
}
|
||||
if (op->new_block == 11000) {
|
||||
data5.resize(options.block_size);
|
||||
std::string sink(options.block_size, '\0');
|
||||
ReadBlockData(reader, op, sink.data(), sink.size());
|
||||
ASSERT_EQ(sink, data5);
|
||||
}
|
||||
if (op->new_block == 12000) {
|
||||
random_buffer.resize(options.block_size);
|
||||
std::string sink(options.block_size, '\0');
|
||||
ReadBlockData(reader, op, sink.data(), sink.size());
|
||||
ASSERT_EQ(sink, random_buffer);
|
||||
}
|
||||
}
|
||||
|
||||
iter->Next();
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<TestParam> GetTestConfigs() {
|
||||
std::vector<TestParam> testParams;
|
||||
|
||||
std::vector<int> block_sizes = {4_KiB, 8_KiB, 16_KiB, 32_KiB, 64_KiB, 128_KiB, 256_KiB};
|
||||
std::vector<std::string> compression_algo = {"none", "lz4", "zstd", "gz"};
|
||||
std::vector<int> threads = {1, 2};
|
||||
// This will also test batch size
|
||||
std::vector<size_t> cluster_ops = {1, 256};
|
||||
|
||||
// This should test 112 combination
|
||||
for (auto block : block_sizes) {
|
||||
for (auto compression : compression_algo) {
|
||||
for (auto thread : threads) {
|
||||
for (auto cluster : cluster_ops) {
|
||||
TestParam param;
|
||||
param.block_size = block;
|
||||
param.compression = compression;
|
||||
param.num_threads = thread;
|
||||
param.cluster_ops = cluster;
|
||||
testParams.push_back(std::move(param));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return testParams;
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(CompressorsWithVariableBlocks, VariableBlockTest,
|
||||
::testing::ValuesIn(GetTestConfigs()));
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace android
|
||||
|
|
|
@ -185,7 +185,7 @@ void CowWriterV2::InitWorkers() {
|
|||
for (int i = 0; i < num_compress_threads_; i++) {
|
||||
std::unique_ptr<ICompressor> compressor =
|
||||
ICompressor::Create(compression_, header_.block_size);
|
||||
auto wt = std::make_unique<CompressWorker>(std::move(compressor), header_.block_size);
|
||||
auto wt = std::make_unique<CompressWorker>(std::move(compressor));
|
||||
threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
|
||||
compress_threads_.push_back(std::move(wt));
|
||||
}
|
||||
|
@ -353,7 +353,7 @@ bool CowWriterV2::CompressBlocks(size_t num_blocks, const void* data) {
|
|||
if (i == num_threads - 1) {
|
||||
num_blocks_per_thread = num_blocks;
|
||||
}
|
||||
worker->EnqueueCompressBlocks(iter, num_blocks_per_thread);
|
||||
worker->EnqueueCompressBlocks(iter, header_.block_size, num_blocks_per_thread);
|
||||
iter += (num_blocks_per_thread * header_.block_size);
|
||||
num_blocks -= num_blocks_per_thread;
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@
|
|||
#include <libsnapshot/cow_compress.h>
|
||||
#include <libsnapshot_cow/parser_v3.h>
|
||||
#include <linux/fs.h>
|
||||
#include <storage_literals/storage_literals.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <unistd.h>
|
||||
#include <numeric>
|
||||
|
@ -54,6 +55,7 @@ namespace snapshot {
|
|||
|
||||
static_assert(sizeof(off_t) == sizeof(uint64_t));
|
||||
|
||||
using namespace android::storage_literals;
|
||||
using android::base::unique_fd;
|
||||
|
||||
// Divide |x| by |y| and round up to the nearest integer.
|
||||
|
@ -77,9 +79,9 @@ void CowWriterV3::InitWorkers() {
|
|||
threads_.clear();
|
||||
for (size_t i = 0; i < num_compress_threads_; i++) {
|
||||
std::unique_ptr<ICompressor> compressor =
|
||||
ICompressor::Create(compression_, header_.block_size);
|
||||
ICompressor::Create(compression_, header_.max_compression_size);
|
||||
auto&& wt = compress_threads_.emplace_back(
|
||||
std::make_unique<CompressWorker>(std::move(compressor), header_.block_size));
|
||||
std::make_unique<CompressWorker>(std::move(compressor)));
|
||||
threads_.emplace_back(std::thread([wt = wt.get()]() { wt->RunThread(); }));
|
||||
}
|
||||
LOG(INFO) << num_compress_threads_ << " thread used for compression";
|
||||
|
@ -111,10 +113,15 @@ void CowWriterV3::SetupHeaders() {
|
|||
header_.op_count_max = 0;
|
||||
header_.compression_algorithm = kCowCompressNone;
|
||||
header_.max_compression_size = options_.compression_factor;
|
||||
return;
|
||||
}
|
||||
|
||||
bool CowWriterV3::ParseOptions() {
|
||||
if (!header_.max_compression_size || !IsBlockAligned(header_.max_compression_size)) {
|
||||
LOG(ERROR) << "Invalid compression factor: " << header_.max_compression_size;
|
||||
return false;
|
||||
}
|
||||
|
||||
LOG(INFO) << "Compression factor: " << header_.max_compression_size;
|
||||
num_compress_threads_ = std::max(int(options_.num_compress_threads), 1);
|
||||
auto parts = android::base::Split(options_.compression, ",");
|
||||
if (parts.size() > 2) {
|
||||
|
@ -154,20 +161,22 @@ bool CowWriterV3::ParseOptions() {
|
|||
|
||||
compression_.algorithm = *algorithm;
|
||||
if (compression_.algorithm != kCowCompressNone) {
|
||||
compressor_ = ICompressor::Create(compression_, header_.block_size);
|
||||
compressor_ = ICompressor::Create(compression_, header_.max_compression_size);
|
||||
if (compressor_ == nullptr) {
|
||||
LOG(ERROR) << "Failed to create compressor for " << compression_.algorithm;
|
||||
return false;
|
||||
}
|
||||
if (options_.cluster_ops &&
|
||||
(android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) ||
|
||||
options_.batch_write)) {
|
||||
batch_size_ = std::max<size_t>(options_.cluster_ops, 1);
|
||||
data_vec_.reserve(batch_size_);
|
||||
cached_data_.reserve(batch_size_);
|
||||
cached_ops_.reserve(batch_size_);
|
||||
}
|
||||
}
|
||||
|
||||
if (options_.cluster_ops &&
|
||||
(android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) ||
|
||||
options_.batch_write)) {
|
||||
batch_size_ = std::max<size_t>(options_.cluster_ops, 1);
|
||||
data_vec_.reserve(batch_size_);
|
||||
cached_data_.reserve(batch_size_);
|
||||
cached_ops_.reserve(batch_size_);
|
||||
}
|
||||
|
||||
if (batch_size_ > 1) {
|
||||
LOG(INFO) << "Batch writes: enabled with batch size " << batch_size_;
|
||||
} else {
|
||||
|
@ -178,6 +187,7 @@ bool CowWriterV3::ParseOptions() {
|
|||
num_compress_threads_ = options_.num_compress_threads;
|
||||
}
|
||||
InitWorkers();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -206,6 +216,14 @@ bool CowWriterV3::Initialize(std::optional<uint64_t> label) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: b/322279333
|
||||
// Set compression factor to 4k during estimation.
|
||||
// Once COW estimator is ready to support variable
|
||||
// block size, this check has to be removed.
|
||||
if (IsEstimating()) {
|
||||
header_.max_compression_size = header_.block_size;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -328,6 +346,46 @@ bool CowWriterV3::NeedsFlush() const {
|
|||
return cached_data_.size() >= batch_size_ || cached_ops_.size() >= batch_size_ * 16;
|
||||
}
|
||||
|
||||
bool CowWriterV3::ConstructCowOpCompressedBuffers(uint64_t new_block_start, const void* data,
|
||||
uint64_t old_block, uint16_t offset,
|
||||
CowOperationType type, size_t blocks_to_write) {
|
||||
size_t compressed_bytes = 0;
|
||||
auto&& blocks = CompressBlocks(blocks_to_write, data, type);
|
||||
if (blocks.empty()) {
|
||||
LOG(ERROR) << "Failed to compress blocks " << new_block_start << ", " << blocks_to_write
|
||||
<< ", actual number of blocks received from compressor " << blocks.size();
|
||||
return false;
|
||||
}
|
||||
size_t blocks_written = 0;
|
||||
for (size_t blk_index = 0; blk_index < blocks.size(); blk_index++) {
|
||||
CowOperation& op = cached_ops_.emplace_back();
|
||||
auto& vec = data_vec_.emplace_back();
|
||||
CompressedBuffer buffer = std::move(blocks[blk_index]);
|
||||
auto& compressed_data = cached_data_.emplace_back(std::move(buffer.compressed_data));
|
||||
op.new_block = new_block_start + blocks_written;
|
||||
|
||||
op.set_type(type);
|
||||
op.set_compression_bits(std::log2(buffer.compression_factor / header_.block_size));
|
||||
|
||||
if (type == kCowXorOp) {
|
||||
op.set_source((old_block + blocks_written) * header_.block_size + offset);
|
||||
} else {
|
||||
op.set_source(next_data_pos_ + compressed_bytes);
|
||||
}
|
||||
|
||||
vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()};
|
||||
op.data_length = vec.iov_len;
|
||||
compressed_bytes += op.data_length;
|
||||
blocks_written += (buffer.compression_factor / header_.block_size);
|
||||
}
|
||||
if (blocks_written != blocks_to_write) {
|
||||
LOG(ERROR) << "Total compressed blocks: " << blocks_written
|
||||
<< " Expected: " << blocks_to_write;
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
|
||||
uint64_t old_block, uint16_t offset, CowOperationType type) {
|
||||
if (compression_.algorithm != kCowCompressNone && compressor_ == nullptr) {
|
||||
|
@ -341,38 +399,21 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t
|
|||
return false;
|
||||
}
|
||||
for (size_t i = 0; i < num_blocks;) {
|
||||
const auto blocks_to_write =
|
||||
const size_t blocks_to_write =
|
||||
std::min<size_t>(batch_size_ - cached_data_.size(), num_blocks - i);
|
||||
size_t compressed_bytes = 0;
|
||||
auto&& blocks = CompressBlocks(blocks_to_write, bytes + header_.block_size * i);
|
||||
if (blocks.size() != blocks_to_write) {
|
||||
LOG(ERROR) << "Failed to compress blocks " << new_block_start + i << ", "
|
||||
<< blocks_to_write << ", actual number of blocks received from compressor "
|
||||
<< blocks.size();
|
||||
|
||||
if (!ConstructCowOpCompressedBuffers(new_block_start + i, bytes + header_.block_size * i,
|
||||
old_block + i, offset, type, blocks_to_write)) {
|
||||
return false;
|
||||
}
|
||||
for (size_t j = 0; j < blocks_to_write; j++) {
|
||||
CowOperation& op = cached_ops_.emplace_back();
|
||||
auto& vec = data_vec_.emplace_back();
|
||||
auto& compressed_data = cached_data_.emplace_back(std::move(blocks[j]));
|
||||
op.new_block = new_block_start + i + j;
|
||||
|
||||
op.set_type(type);
|
||||
if (type == kCowXorOp) {
|
||||
op.set_source((old_block + i + j) * header_.block_size + offset);
|
||||
} else {
|
||||
op.set_source(next_data_pos_ + compressed_bytes);
|
||||
}
|
||||
vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()};
|
||||
op.data_length = vec.iov_len;
|
||||
compressed_bytes += op.data_length;
|
||||
}
|
||||
if (NeedsFlush() && !FlushCacheOps()) {
|
||||
LOG(ERROR) << "EmitBlocks with compression: write failed. new block: "
|
||||
<< new_block_start << " compression: " << compression_.algorithm
|
||||
<< ", op type: " << type;
|
||||
return false;
|
||||
}
|
||||
|
||||
i += blocks_to_write;
|
||||
}
|
||||
|
||||
|
@ -482,55 +523,165 @@ bool CowWriterV3::FlushCacheOps() {
|
|||
return true;
|
||||
}
|
||||
|
||||
std::vector<std::basic_string<uint8_t>> CowWriterV3::CompressBlocks(const size_t num_blocks,
|
||||
const void* data) {
|
||||
const size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
|
||||
const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads);
|
||||
std::vector<std::basic_string<uint8_t>> compressed_buf;
|
||||
compressed_buf.clear();
|
||||
const uint8_t* const iter = reinterpret_cast<const uint8_t*>(data);
|
||||
if (compression_.algorithm == kCowCompressNone) {
|
||||
for (size_t i = 0; i < num_blocks; i++) {
|
||||
auto& buf = compressed_buf.emplace_back();
|
||||
buf.resize(header_.block_size);
|
||||
std::memcpy(buf.data(), iter + i * header_.block_size, header_.block_size);
|
||||
}
|
||||
return compressed_buf;
|
||||
size_t CowWriterV3::GetCompressionFactor(const size_t blocks_to_compress,
|
||||
CowOperationType type) const {
|
||||
// For XOR ops, we don't support bigger block size compression yet.
|
||||
// For bigger block size support, snapshot-merge also has to changed. We
|
||||
// aren't there yet; hence, just stick to 4k for now until
|
||||
// snapshot-merge is ready for XOR operation.
|
||||
if (type == kCowXorOp) {
|
||||
return header_.block_size;
|
||||
}
|
||||
if (num_threads <= 1) {
|
||||
if (!CompressWorker::CompressBlocks(compressor_.get(), header_.block_size, data, num_blocks,
|
||||
&compressed_buf)) {
|
||||
|
||||
size_t compression_factor = header_.max_compression_size;
|
||||
while (compression_factor > header_.block_size) {
|
||||
size_t num_blocks = compression_factor / header_.block_size;
|
||||
if (blocks_to_compress >= num_blocks) {
|
||||
return compression_factor;
|
||||
}
|
||||
compression_factor >>= 1;
|
||||
}
|
||||
return header_.block_size;
|
||||
}
|
||||
|
||||
std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithNoCompression(
|
||||
const size_t num_blocks, const void* data, CowOperationType type) {
|
||||
size_t blocks_to_compress = num_blocks;
|
||||
const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
|
||||
std::vector<CompressedBuffer> compressed_vec;
|
||||
|
||||
while (blocks_to_compress) {
|
||||
CompressedBuffer buffer;
|
||||
|
||||
const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type);
|
||||
size_t num_blocks = compression_factor / header_.block_size;
|
||||
|
||||
buffer.compression_factor = compression_factor;
|
||||
buffer.compressed_data.resize(compression_factor);
|
||||
|
||||
// No compression. Just copy the data as-is.
|
||||
std::memcpy(buffer.compressed_data.data(), iter, compression_factor);
|
||||
|
||||
compressed_vec.push_back(std::move(buffer));
|
||||
blocks_to_compress -= num_blocks;
|
||||
iter += compression_factor;
|
||||
}
|
||||
return compressed_vec;
|
||||
}
|
||||
|
||||
std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithCompression(
|
||||
const size_t num_blocks, const void* data, CowOperationType type) {
|
||||
size_t blocks_to_compress = num_blocks;
|
||||
const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
|
||||
std::vector<CompressedBuffer> compressed_vec;
|
||||
|
||||
while (blocks_to_compress) {
|
||||
CompressedBuffer buffer;
|
||||
|
||||
const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type);
|
||||
size_t num_blocks = compression_factor / header_.block_size;
|
||||
|
||||
buffer.compression_factor = compression_factor;
|
||||
// Compress the blocks
|
||||
buffer.compressed_data = compressor_->Compress(iter, compression_factor);
|
||||
if (buffer.compressed_data.empty()) {
|
||||
PLOG(ERROR) << "Compression failed";
|
||||
return {};
|
||||
}
|
||||
} else {
|
||||
// Submit the blocks per thread. The retrieval of
|
||||
// compressed buffers has to be done in the same order.
|
||||
// We should not poll for completed buffers in a different order as the
|
||||
// buffers are tightly coupled with block ordering.
|
||||
for (size_t i = 0; i < num_threads; i++) {
|
||||
CompressWorker* worker = compress_threads_[i].get();
|
||||
const auto blocks_in_batch =
|
||||
std::min(num_blocks - i * blocks_per_thread, blocks_per_thread);
|
||||
worker->EnqueueCompressBlocks(iter + i * blocks_per_thread * header_.block_size,
|
||||
blocks_in_batch);
|
||||
|
||||
// Check if the buffer was indeed compressed
|
||||
if (buffer.compressed_data.size() >= compression_factor) {
|
||||
buffer.compressed_data.resize(compression_factor);
|
||||
std::memcpy(buffer.compressed_data.data(), iter, compression_factor);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < num_threads; i++) {
|
||||
CompressWorker* worker = compress_threads_[i].get();
|
||||
if (!worker->GetCompressedBuffers(&compressed_buf)) {
|
||||
return {};
|
||||
}
|
||||
compressed_vec.push_back(std::move(buffer));
|
||||
blocks_to_compress -= num_blocks;
|
||||
iter += compression_factor;
|
||||
}
|
||||
return compressed_vec;
|
||||
}
|
||||
|
||||
std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithThreadedCompression(
|
||||
const size_t num_blocks, const void* data, CowOperationType type) {
|
||||
const size_t num_threads = num_compress_threads_;
|
||||
const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads);
|
||||
const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
|
||||
|
||||
std::vector<CompressedBuffer> compressed_vec;
|
||||
// Submit the blocks per thread. The retrieval of
|
||||
// compressed buffers has to be done in the same order.
|
||||
// We should not poll for completed buffers in a different order as the
|
||||
// buffers are tightly coupled with block ordering.
|
||||
for (size_t i = 0; i < num_threads; i++) {
|
||||
CompressWorker* worker = compress_threads_[i].get();
|
||||
auto blocks_in_batch = std::min(num_blocks - i * blocks_per_thread, blocks_per_thread);
|
||||
// Enqueue the blocks to be compressed for each thread.
|
||||
while (blocks_in_batch) {
|
||||
CompressedBuffer buffer;
|
||||
|
||||
const size_t compression_factor = GetCompressionFactor(blocks_in_batch, type);
|
||||
size_t num_blocks = compression_factor / header_.block_size;
|
||||
|
||||
buffer.compression_factor = compression_factor;
|
||||
worker->EnqueueCompressBlocks(iter, compression_factor, 1);
|
||||
compressed_vec.push_back(std::move(buffer));
|
||||
blocks_in_batch -= num_blocks;
|
||||
iter += compression_factor;
|
||||
}
|
||||
}
|
||||
for (size_t i = 0; i < num_blocks; i++) {
|
||||
|
||||
// Fetch compressed buffers from the threads
|
||||
std::vector<std::basic_string<uint8_t>> compressed_buf;
|
||||
compressed_buf.clear();
|
||||
for (size_t i = 0; i < num_threads; i++) {
|
||||
CompressWorker* worker = compress_threads_[i].get();
|
||||
if (!worker->GetCompressedBuffers(&compressed_buf)) {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
if (compressed_vec.size() != compressed_buf.size()) {
|
||||
LOG(ERROR) << "Compressed buffer size: " << compressed_buf.size()
|
||||
<< " - Expected: " << compressed_vec.size();
|
||||
return {};
|
||||
}
|
||||
|
||||
iter = reinterpret_cast<const uint8_t*>(data);
|
||||
// Walk through all the compressed buffers
|
||||
for (size_t i = 0; i < compressed_buf.size(); i++) {
|
||||
auto& buffer = compressed_vec[i];
|
||||
auto& block = compressed_buf[i];
|
||||
if (block.size() >= header_.block_size) {
|
||||
block.resize(header_.block_size);
|
||||
std::memcpy(block.data(), iter + header_.block_size * i, header_.block_size);
|
||||
size_t block_size = buffer.compression_factor;
|
||||
// Check if the blocks was indeed compressed
|
||||
if (block.size() >= block_size) {
|
||||
buffer.compressed_data.resize(block_size);
|
||||
std::memcpy(buffer.compressed_data.data(), iter, block_size);
|
||||
} else {
|
||||
// Compressed block
|
||||
buffer.compressed_data.resize(block.size());
|
||||
std::memcpy(buffer.compressed_data.data(), block.data(), block.size());
|
||||
}
|
||||
iter += block_size;
|
||||
}
|
||||
return compressed_vec;
|
||||
}
|
||||
|
||||
std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::CompressBlocks(const size_t num_blocks,
|
||||
const void* data,
|
||||
CowOperationType type) {
|
||||
if (compression_.algorithm == kCowCompressNone) {
|
||||
return ProcessBlocksWithNoCompression(num_blocks, data, type);
|
||||
}
|
||||
|
||||
return compressed_buf;
|
||||
const size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
|
||||
|
||||
// If no threads are required, just compress the blocks inline.
|
||||
if (num_threads <= 1) {
|
||||
return ProcessBlocksWithCompression(num_blocks, data, type);
|
||||
}
|
||||
|
||||
return ProcessBlocksWithThreadedCompression(num_blocks, data, type);
|
||||
}
|
||||
|
||||
bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops,
|
||||
|
|
|
@ -19,11 +19,15 @@
|
|||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <libsnapshot/cow_format.h>
|
||||
#include <storage_literals/storage_literals.h>
|
||||
#include "writer_base.h"
|
||||
|
||||
namespace android {
|
||||
namespace snapshot {
|
||||
|
||||
using namespace android::storage_literals;
|
||||
|
||||
class CowWriterV3 : public CowWriterBase {
|
||||
public:
|
||||
explicit CowWriterV3(const CowOptions& options, android::base::unique_fd&& fd);
|
||||
|
@ -43,6 +47,10 @@ class CowWriterV3 : public CowWriterBase {
|
|||
virtual bool EmitSequenceData(size_t num_ops, const uint32_t* data) override;
|
||||
|
||||
private:
|
||||
struct CompressedBuffer {
|
||||
size_t compression_factor;
|
||||
std::basic_string<uint8_t> compressed_data;
|
||||
};
|
||||
void SetupHeaders();
|
||||
bool NeedsFlush() const;
|
||||
bool ParseOptions();
|
||||
|
@ -52,11 +60,38 @@ class CowWriterV3 : public CowWriterBase {
|
|||
std::basic_string_view<struct iovec> data);
|
||||
bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
|
||||
uint16_t offset, CowOperationType type);
|
||||
bool ConstructCowOpCompressedBuffers(uint64_t new_block_start, const void* data,
|
||||
uint64_t old_block, uint16_t offset, CowOperationType type,
|
||||
size_t blocks_to_write);
|
||||
bool CheckOpCount(size_t op_count);
|
||||
|
||||
private:
|
||||
std::vector<std::basic_string<uint8_t>> CompressBlocks(const size_t num_blocks,
|
||||
const void* data);
|
||||
std::vector<CompressedBuffer> ProcessBlocksWithNoCompression(const size_t num_blocks,
|
||||
const void* data,
|
||||
CowOperationType type);
|
||||
std::vector<CompressedBuffer> ProcessBlocksWithCompression(const size_t num_blocks,
|
||||
const void* data,
|
||||
CowOperationType type);
|
||||
std::vector<CompressedBuffer> ProcessBlocksWithThreadedCompression(const size_t num_blocks,
|
||||
const void* data,
|
||||
CowOperationType type);
|
||||
std::vector<CompressedBuffer> CompressBlocks(const size_t num_blocks, const void* data,
|
||||
CowOperationType type);
|
||||
size_t GetCompressionFactor(const size_t blocks_to_compress, CowOperationType type) const;
|
||||
|
||||
constexpr bool IsBlockAligned(const size_t size) {
|
||||
// These are the only block size supported. Block size beyond 256k
|
||||
// may impact random read performance post OTA boot.
|
||||
const size_t values[] = {4_KiB, 8_KiB, 16_KiB, 32_KiB, 64_KiB, 128_KiB, 256_KiB};
|
||||
|
||||
auto it = std::lower_bound(std::begin(values), std::end(values), size);
|
||||
|
||||
if (it != std::end(values) && *it == size) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool ReadBackVerification();
|
||||
bool FlushCacheOps();
|
||||
void InitWorkers();
|
||||
|
|
|
@ -13,10 +13,11 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
#include "merge_worker.h"
|
||||
|
||||
#include <libsnapshot/cow_format.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include "merge_worker.h"
|
||||
#include "snapuserd_core.h"
|
||||
#include "utility.h"
|
||||
|
||||
|
@ -37,6 +38,7 @@ int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
|
|||
int num_ops = *pending_ops;
|
||||
int nr_consecutive = 0;
|
||||
bool checkOrderedOp = (replace_zero_vec == nullptr);
|
||||
size_t num_blocks = 1;
|
||||
|
||||
do {
|
||||
if (!cowop_iter_->AtEnd() && num_ops) {
|
||||
|
@ -48,11 +50,15 @@ int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
|
|||
*source_offset = cow_op->new_block * BLOCK_SZ;
|
||||
if (!checkOrderedOp) {
|
||||
replace_zero_vec->push_back(cow_op);
|
||||
if (cow_op->type() == kCowReplaceOp) {
|
||||
// Get the number of blocks this op has compressed
|
||||
num_blocks = (CowOpCompressionSize(cow_op, BLOCK_SZ) / BLOCK_SZ);
|
||||
}
|
||||
}
|
||||
|
||||
cowop_iter_->Next();
|
||||
num_ops -= 1;
|
||||
nr_consecutive = 1;
|
||||
num_ops -= num_blocks;
|
||||
nr_consecutive = num_blocks;
|
||||
|
||||
while (!cowop_iter_->AtEnd() && num_ops) {
|
||||
const CowOperation* op = cowop_iter_->Get();
|
||||
|
@ -66,11 +72,20 @@ int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
|
|||
}
|
||||
|
||||
if (!checkOrderedOp) {
|
||||
if (op->type() == kCowReplaceOp) {
|
||||
num_blocks = (CowOpCompressionSize(op, BLOCK_SZ) / BLOCK_SZ);
|
||||
if (num_ops < num_blocks) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// zero op
|
||||
num_blocks = 1;
|
||||
}
|
||||
replace_zero_vec->push_back(op);
|
||||
}
|
||||
|
||||
nr_consecutive += 1;
|
||||
num_ops -= 1;
|
||||
nr_consecutive += num_blocks;
|
||||
num_ops -= num_blocks;
|
||||
cowop_iter_->Next();
|
||||
}
|
||||
}
|
||||
|
@ -108,18 +123,24 @@ bool MergeWorker::MergeReplaceZeroOps() {
|
|||
|
||||
for (size_t i = 0; i < replace_zero_vec.size(); i++) {
|
||||
const CowOperation* cow_op = replace_zero_vec[i];
|
||||
|
||||
void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
|
||||
if (!buffer) {
|
||||
SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
|
||||
return false;
|
||||
}
|
||||
if (cow_op->type() == kCowReplaceOp) {
|
||||
if (!reader_->ReadData(cow_op, buffer, BLOCK_SZ)) {
|
||||
size_t buffer_size = CowOpCompressionSize(cow_op, BLOCK_SZ);
|
||||
void* buffer = bufsink_.AcquireBuffer(buffer_size);
|
||||
if (!buffer) {
|
||||
SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
|
||||
return false;
|
||||
}
|
||||
// Read the entire compressed buffer spanning multiple blocks
|
||||
if (!reader_->ReadData(cow_op, buffer, buffer_size)) {
|
||||
SNAP_LOG(ERROR) << "Failed to read COW in merge";
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
|
||||
if (!buffer) {
|
||||
SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
|
||||
return false;
|
||||
}
|
||||
CHECK(cow_op->type() == kCowZeroOp);
|
||||
memset(buffer, 0, BLOCK_SZ);
|
||||
}
|
||||
|
@ -137,7 +158,7 @@ bool MergeWorker::MergeReplaceZeroOps() {
|
|||
return false;
|
||||
}
|
||||
|
||||
num_ops_merged += linear_blocks;
|
||||
num_ops_merged += replace_zero_vec.size();
|
||||
|
||||
if (num_ops_merged >= total_ops_merged_per_commit) {
|
||||
// Flush the data
|
||||
|
|
|
@ -14,10 +14,10 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include "read_worker.h"
|
||||
|
||||
#include <libsnapshot/cow_format.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include "read_worker.h"
|
||||
#include "snapuserd_core.h"
|
||||
#include "utility.h"
|
||||
|
||||
|
@ -48,9 +48,10 @@ ReadWorker::ReadWorker(const std::string& cow_device, const std::string& backing
|
|||
// Start the replace operation. This will read the
|
||||
// internal COW format and if the block is compressed,
|
||||
// it will be de-compressed.
|
||||
bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op, void* buffer) {
|
||||
if (!reader_->ReadData(cow_op, buffer, BLOCK_SZ)) {
|
||||
SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
|
||||
bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op, void* buffer, size_t buffer_size) {
|
||||
if (!reader_->ReadData(cow_op, buffer, buffer_size)) {
|
||||
SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block
|
||||
<< " buffer_size: " << buffer_size;
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
@ -183,7 +184,13 @@ bool ReadWorker::ProcessCowOp(const CowOperation* cow_op, void* buffer) {
|
|||
|
||||
switch (cow_op->type()) {
|
||||
case kCowReplaceOp: {
|
||||
return ProcessReplaceOp(cow_op, buffer);
|
||||
size_t buffer_size = CowOpCompressionSize(cow_op, BLOCK_SZ);
|
||||
uint8_t chunk[buffer_size];
|
||||
if (!ProcessReplaceOp(cow_op, chunk, buffer_size)) {
|
||||
return false;
|
||||
}
|
||||
std::memcpy(buffer, chunk, BLOCK_SZ);
|
||||
return true;
|
||||
}
|
||||
|
||||
case kCowZeroOp: {
|
||||
|
@ -209,6 +216,13 @@ bool ReadWorker::Init() {
|
|||
return false;
|
||||
}
|
||||
|
||||
const size_t compression_factor = reader_->GetMaxCompressionSize();
|
||||
if (!compression_factor) {
|
||||
SNAP_LOG(ERROR) << "Compression factor is set to 0 which is invalid.";
|
||||
return false;
|
||||
}
|
||||
decompressed_buffer_ = std::make_unique<uint8_t[]>(compression_factor);
|
||||
|
||||
backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
|
||||
if (backing_store_fd_ < 0) {
|
||||
SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
|
||||
|
@ -276,6 +290,20 @@ bool ReadWorker::ReadDataFromBaseDevice(sector_t sector, void* buffer, size_t re
|
|||
return true;
|
||||
}
|
||||
|
||||
bool ReadWorker::GetCowOpBlockOffset(const CowOperation* cow_op, uint64_t io_block,
|
||||
off_t* block_offset) {
|
||||
// If this is a replace op, get the block offset of this I/O
|
||||
// block. Multi-block compression is supported only for
|
||||
// Replace ops.
|
||||
//
|
||||
// Note: This can be extended when we support COPY and XOR ops down the
|
||||
// line as the blocks are mostly contiguous.
|
||||
if (cow_op && cow_op->type() == kCowReplaceOp) {
|
||||
return GetBlockOffset(cow_op, io_block, BLOCK_SZ, block_offset);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
|
||||
size_t remaining_size = sz;
|
||||
std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
|
||||
|
@ -286,7 +314,7 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
|
|||
size_t read_size = std::min(PAYLOAD_BUFFER_SZ, remaining_size);
|
||||
|
||||
size_t total_bytes_read = 0;
|
||||
|
||||
const CowOperation* prev_op = nullptr;
|
||||
while (read_size) {
|
||||
// We need to check every 4k block to verify if it is
|
||||
// present in the mapping.
|
||||
|
@ -294,7 +322,7 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
|
|||
|
||||
auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
|
||||
std::make_pair(sector, nullptr), SnapshotHandler::compare);
|
||||
bool not_found = (it == chunk_vec.end() || it->first != sector);
|
||||
const bool sector_not_found = (it == chunk_vec.end() || it->first != sector);
|
||||
|
||||
void* buffer = block_server_->GetResponseBuffer(BLOCK_SZ, size);
|
||||
if (!buffer) {
|
||||
|
@ -302,15 +330,88 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (not_found) {
|
||||
// Block not found in map - which means this block was not
|
||||
// changed as per the OTA. Just route the I/O to the base
|
||||
// device.
|
||||
if (!ReadDataFromBaseDevice(sector, buffer, size)) {
|
||||
SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed";
|
||||
return false;
|
||||
if (sector_not_found) {
|
||||
// Find the 4k block
|
||||
uint64_t io_block = SectorToChunk(sector);
|
||||
// Get the previous iterator. Since the vector is sorted, the
|
||||
// lookup of this sector can fall in a range of blocks if
|
||||
// CowOperation has compressed multiple blocks.
|
||||
if (it != chunk_vec.begin()) {
|
||||
std::advance(it, -1);
|
||||
}
|
||||
|
||||
bool is_mapping_present = true;
|
||||
|
||||
// Vector itself is empty. This can happen if the block was not
|
||||
// changed per the OTA or if the merge was already complete but
|
||||
// snapshot table was not yet collapsed.
|
||||
if (it == chunk_vec.end()) {
|
||||
is_mapping_present = false;
|
||||
}
|
||||
|
||||
const CowOperation* cow_op = nullptr;
|
||||
// Relative offset within the compressed multiple blocks
|
||||
off_t block_offset = 0;
|
||||
if (is_mapping_present) {
|
||||
// Get the nearest operation found in the vector
|
||||
cow_op = it->second;
|
||||
is_mapping_present = GetCowOpBlockOffset(cow_op, io_block, &block_offset);
|
||||
}
|
||||
|
||||
// Thus, we have a case wherein sector was not found in the sorted
|
||||
// vector; however, we indeed have a mapping of this sector
|
||||
// embedded in one of the CowOperation which spans multiple
|
||||
// block size.
|
||||
if (is_mapping_present) {
|
||||
// block_offset = 0 would mean that the CowOperation should
|
||||
// already be in the sorted vector. Hence, lookup should
|
||||
// have already found it. If not, this is a bug.
|
||||
if (block_offset == 0) {
|
||||
SNAP_LOG(ERROR)
|
||||
<< "GetBlockOffset returned offset 0 for io_block: " << io_block;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Get the CowOperation actual compression size
|
||||
size_t compression_size = CowOpCompressionSize(cow_op, BLOCK_SZ);
|
||||
// Offset cannot be greater than the compression size
|
||||
if (block_offset > compression_size) {
|
||||
SNAP_LOG(ERROR) << "Invalid I/O block found. io_block: " << io_block
|
||||
<< " CowOperation-new-block: " << cow_op->new_block
|
||||
<< " compression-size: " << compression_size;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Cached copy of the previous iteration. Just retrieve the
|
||||
// data
|
||||
if (prev_op && prev_op->new_block == cow_op->new_block) {
|
||||
std::memcpy(buffer, (char*)decompressed_buffer_.get() + block_offset, size);
|
||||
} else {
|
||||
// Get the data from the disk based on the compression
|
||||
// size
|
||||
if (!ProcessReplaceOp(cow_op, decompressed_buffer_.get(),
|
||||
compression_size)) {
|
||||
return false;
|
||||
}
|
||||
// Copy the data from the decompressed buffer relative
|
||||
// to the i/o block offset.
|
||||
std::memcpy(buffer, (char*)decompressed_buffer_.get() + block_offset, size);
|
||||
// Cache this CowOperation pointer for successive I/O
|
||||
// operation. Since the request is sequential and the
|
||||
// block is already decompressed, subsequest I/O blocks
|
||||
// can fetch the data directly from this decompressed
|
||||
// buffer.
|
||||
prev_op = cow_op;
|
||||
}
|
||||
} else {
|
||||
// Block not found in map - which means this block was not
|
||||
// changed as per the OTA. Just route the I/O to the base
|
||||
// device.
|
||||
if (!ReadDataFromBaseDevice(sector, buffer, size)) {
|
||||
SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
ret = size;
|
||||
} else {
|
||||
// We found the sector in mapping. Check the type of COW OP and
|
||||
|
@ -341,12 +442,50 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool ReadWorker::IsMappingPresent(const CowOperation* cow_op, loff_t requested_offset,
|
||||
loff_t cow_op_offset) {
|
||||
const bool replace_op = (cow_op->type() == kCowReplaceOp);
|
||||
if (replace_op) {
|
||||
size_t max_compressed_size = CowOpCompressionSize(cow_op, BLOCK_SZ);
|
||||
if ((requested_offset >= cow_op_offset) &&
|
||||
(requested_offset < (cow_op_offset + max_compressed_size))) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
int ReadWorker::ReadUnalignedSector(
|
||||
sector_t sector, size_t size,
|
||||
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
|
||||
SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
|
||||
<< " Aligned sector: " << it->first;
|
||||
|
||||
loff_t requested_offset = sector << SECTOR_SHIFT;
|
||||
loff_t final_offset = (it->first) << SECTOR_SHIFT;
|
||||
|
||||
const CowOperation* cow_op = it->second;
|
||||
if (IsMappingPresent(cow_op, requested_offset, final_offset)) {
|
||||
size_t buffer_size = CowOpCompressionSize(cow_op, BLOCK_SZ);
|
||||
uint8_t chunk[buffer_size];
|
||||
// Read the entire decompressed buffer based on the block-size
|
||||
if (!ProcessReplaceOp(cow_op, chunk, buffer_size)) {
|
||||
return -1;
|
||||
}
|
||||
size_t skip_offset = (requested_offset - final_offset);
|
||||
size_t write_sz = std::min(size, buffer_size - skip_offset);
|
||||
|
||||
auto buffer =
|
||||
reinterpret_cast<uint8_t*>(block_server_->GetResponseBuffer(BLOCK_SZ, write_sz));
|
||||
if (!buffer) {
|
||||
SNAP_LOG(ERROR) << "ReadUnalignedSector failed to allocate buffer";
|
||||
return -1;
|
||||
}
|
||||
|
||||
std::memcpy(buffer, (char*)chunk + skip_offset, write_sz);
|
||||
return write_sz;
|
||||
}
|
||||
|
||||
int num_sectors_skip = sector - it->first;
|
||||
size_t skip_size = num_sectors_skip << SECTOR_SHIFT;
|
||||
size_t write_size = std::min(size, BLOCK_SZ - skip_size);
|
||||
|
@ -445,8 +584,11 @@ bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
|
|||
|
||||
size_t remaining_size = size;
|
||||
int ret = 0;
|
||||
|
||||
const CowOperation* cow_op = it->second;
|
||||
if (!merge_complete && (requested_offset >= final_offset) &&
|
||||
(requested_offset - final_offset) < BLOCK_SZ) {
|
||||
(((requested_offset - final_offset) < BLOCK_SZ) ||
|
||||
IsMappingPresent(cow_op, requested_offset, final_offset))) {
|
||||
// Read the partial un-aligned data
|
||||
ret = ReadUnalignedSector(sector, remaining_size, it);
|
||||
if (ret < 0) {
|
||||
|
|
|
@ -44,9 +44,12 @@ class ReadWorker : public Worker, public IBlockServer::Delegate {
|
|||
bool ProcessXorOp(const CowOperation* cow_op, void* buffer);
|
||||
bool ProcessOrderedOp(const CowOperation* cow_op, void* buffer);
|
||||
bool ProcessCopyOp(const CowOperation* cow_op, void* buffer);
|
||||
bool ProcessReplaceOp(const CowOperation* cow_op, void* buffer);
|
||||
bool ProcessReplaceOp(const CowOperation* cow_op, void* buffer, size_t buffer_size);
|
||||
bool ProcessZeroOp(void* buffer);
|
||||
|
||||
bool IsMappingPresent(const CowOperation* cow_op, loff_t requested_offset,
|
||||
loff_t cow_op_offset);
|
||||
bool GetCowOpBlockOffset(const CowOperation* cow_op, uint64_t io_block, off_t* block_offset);
|
||||
bool ReadAlignedSector(sector_t sector, size_t sz);
|
||||
bool ReadUnalignedSector(sector_t sector, size_t size);
|
||||
int ReadUnalignedSector(sector_t sector, size_t size,
|
||||
|
@ -56,6 +59,7 @@ class ReadWorker : public Worker, public IBlockServer::Delegate {
|
|||
|
||||
constexpr bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
|
||||
constexpr sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
|
||||
constexpr chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
|
||||
|
||||
std::string backing_store_device_;
|
||||
unique_fd backing_store_fd_;
|
||||
|
@ -67,6 +71,7 @@ class ReadWorker : public Worker, public IBlockServer::Delegate {
|
|||
|
||||
std::basic_string<uint8_t> xor_buffer_;
|
||||
std::unique_ptr<void, decltype(&::free)> aligned_buffer_;
|
||||
std::unique_ptr<uint8_t[]> decompressed_buffer_;
|
||||
};
|
||||
|
||||
} // namespace snapshot
|
||||
|
|
|
@ -64,6 +64,9 @@ using ::testing::TestWithParam;
|
|||
struct TestParam {
|
||||
bool io_uring;
|
||||
bool o_direct;
|
||||
std::string compression;
|
||||
int block_size;
|
||||
int num_threads;
|
||||
};
|
||||
|
||||
class SnapuserdTestBase : public ::testing::TestWithParam<TestParam> {
|
||||
|
@ -74,6 +77,7 @@ class SnapuserdTestBase : public ::testing::TestWithParam<TestParam> {
|
|||
void CreateCowDevice();
|
||||
void SetDeviceControlName();
|
||||
std::unique_ptr<ICowWriter> CreateCowDeviceInternal();
|
||||
std::unique_ptr<ICowWriter> CreateV3Cow();
|
||||
|
||||
std::unique_ptr<ITestHarness> harness_;
|
||||
size_t size_ = 10_MiB;
|
||||
|
@ -133,6 +137,24 @@ std::unique_ptr<ICowWriter> SnapuserdTestBase::CreateCowDeviceInternal() {
|
|||
return CreateCowWriter(kDefaultCowVersion, options, std::move(fd));
|
||||
}
|
||||
|
||||
std::unique_ptr<ICowWriter> SnapuserdTestBase::CreateV3Cow() {
|
||||
const TestParam params = GetParam();
|
||||
|
||||
CowOptions options;
|
||||
options.op_count_max = 100000;
|
||||
options.compression = params.compression;
|
||||
options.num_compress_threads = params.num_threads;
|
||||
options.batch_write = true;
|
||||
options.compression_factor = params.block_size;
|
||||
|
||||
cow_system_ = std::make_unique<TemporaryFile>();
|
||||
|
||||
unique_fd fd(cow_system_->fd);
|
||||
cow_system_->fd = -1;
|
||||
|
||||
return CreateCowWriter(3, options, std::move(fd));
|
||||
}
|
||||
|
||||
void SnapuserdTestBase::CreateCowDevice() {
|
||||
unique_fd rnd_fd;
|
||||
loff_t offset = 0;
|
||||
|
@ -236,6 +258,7 @@ class SnapuserdTest : public SnapuserdTestBase {
|
|||
void SetupOrderedOpsInverted();
|
||||
void SetupCopyOverlap_1();
|
||||
void SetupCopyOverlap_2();
|
||||
void SetupDeviceForPassthrough();
|
||||
bool Merge();
|
||||
void ValidateMerge();
|
||||
void ReadSnapshotDeviceAndValidate();
|
||||
|
@ -258,6 +281,9 @@ class SnapuserdTest : public SnapuserdTestBase {
|
|||
|
||||
void SimulateDaemonRestart();
|
||||
|
||||
void CreateCowDeviceWithNoBlockChanges();
|
||||
void ValidateDeviceWithNoBlockChanges();
|
||||
|
||||
void CreateCowDeviceOrderedOps();
|
||||
void CreateCowDeviceOrderedOpsInverted();
|
||||
void CreateCowDeviceWithCopyOverlap_1();
|
||||
|
@ -307,6 +333,12 @@ void SnapuserdTest::SetupOrderedOps() {
|
|||
ASSERT_NO_FATAL_FAILURE(SetupDaemon());
|
||||
}
|
||||
|
||||
void SnapuserdTest::SetupDeviceForPassthrough() {
|
||||
ASSERT_NO_FATAL_FAILURE(CreateBaseDevice());
|
||||
ASSERT_NO_FATAL_FAILURE(CreateCowDeviceWithNoBlockChanges());
|
||||
ASSERT_NO_FATAL_FAILURE(SetupDaemon());
|
||||
}
|
||||
|
||||
void SnapuserdTest::SetupOrderedOpsInverted() {
|
||||
ASSERT_NO_FATAL_FAILURE(CreateBaseDevice());
|
||||
ASSERT_NO_FATAL_FAILURE(CreateCowDeviceOrderedOpsInverted());
|
||||
|
@ -480,6 +512,47 @@ void SnapuserdTest::CreateCowDeviceWithCopyOverlap_2() {
|
|||
}
|
||||
}
|
||||
|
||||
void SnapuserdTest::CreateCowDeviceWithNoBlockChanges() {
|
||||
auto writer = CreateCowDeviceInternal();
|
||||
ASSERT_NE(writer, nullptr);
|
||||
|
||||
std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(BLOCK_SZ);
|
||||
std::memset(buffer.get(), 'A', BLOCK_SZ);
|
||||
|
||||
// This test focusses on not changing all the blocks thereby validating
|
||||
// the pass-through I/O
|
||||
|
||||
// Replace the first block
|
||||
ASSERT_TRUE(writer->AddRawBlocks(1, buffer.get(), BLOCK_SZ));
|
||||
|
||||
// Set zero block of Block 3
|
||||
ASSERT_TRUE(writer->AddZeroBlocks(3, 1));
|
||||
|
||||
ASSERT_TRUE(writer->Finalize());
|
||||
orig_buffer_ = std::make_unique<uint8_t[]>(total_base_size_);
|
||||
|
||||
// Read the entire base device
|
||||
ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, orig_buffer_.get(), total_base_size_, 0),
|
||||
true);
|
||||
|
||||
off_t offset = BLOCK_SZ;
|
||||
std::memcpy(orig_buffer_.get() + offset, buffer.get(), BLOCK_SZ);
|
||||
offset = 3 * BLOCK_SZ;
|
||||
std::memset(orig_buffer_.get() + offset, 0, BLOCK_SZ);
|
||||
}
|
||||
|
||||
void SnapuserdTest::ValidateDeviceWithNoBlockChanges() {
|
||||
unique_fd fd(open(dmuser_dev_->GetPath().c_str(), O_RDONLY));
|
||||
ASSERT_GE(fd, 0);
|
||||
std::unique_ptr<uint8_t[]> snapshot_buffer = std::make_unique<uint8_t[]>(size_);
|
||||
std::memset(snapshot_buffer.get(), 'B', size_);
|
||||
|
||||
// All the I/O request should be a pass through to base device except for
|
||||
// Block 1 and Block 3.
|
||||
ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), size_, 0), true);
|
||||
ASSERT_EQ(memcmp(snapshot_buffer.get(), orig_buffer_.get(), size_), 0);
|
||||
}
|
||||
|
||||
void SnapuserdTest::CreateCowDeviceWithCopyOverlap_1() {
|
||||
auto writer = CreateCowDeviceInternal();
|
||||
ASSERT_NE(writer, nullptr);
|
||||
|
@ -781,6 +854,20 @@ void SnapuserdTest::MergeInterrupt() {
|
|||
ASSERT_TRUE(Merge());
|
||||
}
|
||||
|
||||
TEST_P(SnapuserdTest, Snapshot_Passthrough) {
|
||||
if (!harness_->HasUserDevice()) {
|
||||
GTEST_SKIP() << "Skipping snapshot read; not supported";
|
||||
}
|
||||
ASSERT_NO_FATAL_FAILURE(SetupDeviceForPassthrough());
|
||||
// I/O before merge
|
||||
ASSERT_NO_FATAL_FAILURE(ValidateDeviceWithNoBlockChanges());
|
||||
ASSERT_TRUE(Merge());
|
||||
ValidateMerge();
|
||||
// I/O after merge - daemon should read directly
|
||||
// from base device
|
||||
ASSERT_NO_FATAL_FAILURE(ValidateDeviceWithNoBlockChanges());
|
||||
}
|
||||
|
||||
TEST_P(SnapuserdTest, Snapshot_IO_TEST) {
|
||||
if (!harness_->HasUserDevice()) {
|
||||
GTEST_SKIP() << "Skipping snapshot read; not supported";
|
||||
|
@ -853,7 +940,7 @@ TEST_P(SnapuserdTest, Snapshot_COPY_Overlap_Merge_Resume_IO_Validate_TEST) {
|
|||
GTEST_SKIP() << "Skipping snapshot read; not supported";
|
||||
}
|
||||
ASSERT_NO_FATAL_FAILURE(SetupCopyOverlap_2());
|
||||
ASSERT_NO_FATAL_FAILURE(MergeInterruptAndValidate(2));
|
||||
ASSERT_NO_FATAL_FAILURE(MergeInterruptFixed(300));
|
||||
ValidateMerge();
|
||||
}
|
||||
|
||||
|
@ -881,11 +968,243 @@ TEST_P(SnapuserdTest, Snapshot_Merge_Crash_Random_Inverted) {
|
|||
ValidateMerge();
|
||||
}
|
||||
|
||||
class SnapuserdVariableBlockSizeTest : public SnapuserdTest {
|
||||
public:
|
||||
void SetupCowV3ForVariableBlockSize();
|
||||
void ReadSnapshotWithVariableBlockSize();
|
||||
|
||||
protected:
|
||||
void SetUp() override;
|
||||
void TearDown() override;
|
||||
|
||||
void CreateV3CowDeviceForVariableBlockSize();
|
||||
};
|
||||
|
||||
void SnapuserdVariableBlockSizeTest::SetupCowV3ForVariableBlockSize() {
|
||||
ASSERT_NO_FATAL_FAILURE(CreateBaseDevice());
|
||||
ASSERT_NO_FATAL_FAILURE(CreateV3CowDeviceForVariableBlockSize());
|
||||
ASSERT_NO_FATAL_FAILURE(SetupDaemon());
|
||||
}
|
||||
|
||||
void SnapuserdVariableBlockSizeTest::CreateV3CowDeviceForVariableBlockSize() {
|
||||
auto writer = CreateV3Cow();
|
||||
|
||||
size_t total_data_to_write = size_;
|
||||
|
||||
size_t total_blocks_to_write = total_data_to_write / BLOCK_SZ;
|
||||
size_t num_blocks_per_op = total_blocks_to_write / 4;
|
||||
size_t source_block = 0;
|
||||
|
||||
size_t seq_len = num_blocks_per_op;
|
||||
uint32_t sequence[seq_len];
|
||||
size_t xor_block_start = seq_len * 3;
|
||||
for (size_t i = 0; i < seq_len; i++) {
|
||||
sequence[i] = xor_block_start + i;
|
||||
}
|
||||
ASSERT_TRUE(writer->AddSequenceData(seq_len, sequence));
|
||||
|
||||
size_t total_replace_blocks = num_blocks_per_op;
|
||||
// Write some data which can be compressed
|
||||
std::string data;
|
||||
data.resize(total_replace_blocks * BLOCK_SZ, '\0');
|
||||
for (size_t i = 0; i < data.size(); i++) {
|
||||
data[i] = static_cast<char>('A' + i / BLOCK_SZ);
|
||||
}
|
||||
// REPLACE ops
|
||||
ASSERT_TRUE(writer->AddRawBlocks(source_block, data.data(), data.size()));
|
||||
|
||||
total_blocks_to_write -= total_replace_blocks;
|
||||
source_block = source_block + total_replace_blocks;
|
||||
|
||||
// ZERO ops
|
||||
size_t total_zero_blocks = total_blocks_to_write / 3;
|
||||
ASSERT_TRUE(writer->AddZeroBlocks(source_block, total_zero_blocks));
|
||||
|
||||
total_blocks_to_write -= total_zero_blocks;
|
||||
source_block = source_block + total_zero_blocks;
|
||||
|
||||
// Generate some random data wherein few blocks cannot be compressed.
|
||||
// This is to test the I/O path for those blocks which aren't compressed.
|
||||
size_t total_random_data_blocks = total_blocks_to_write / 2;
|
||||
unique_fd rnd_fd(open("/dev/random", O_RDONLY));
|
||||
|
||||
ASSERT_GE(rnd_fd, 0);
|
||||
std::string random_buffer;
|
||||
random_buffer.resize(total_random_data_blocks * BLOCK_SZ, '\0');
|
||||
ASSERT_EQ(
|
||||
android::base::ReadFullyAtOffset(rnd_fd, random_buffer.data(), random_buffer.size(), 0),
|
||||
true);
|
||||
// REPLACE ops
|
||||
ASSERT_TRUE(writer->AddRawBlocks(source_block, random_buffer.data(), random_buffer.size()));
|
||||
|
||||
total_blocks_to_write -= total_random_data_blocks;
|
||||
source_block = source_block + total_random_data_blocks;
|
||||
|
||||
// XOR ops will always be 4k blocks
|
||||
std::string xor_buffer;
|
||||
xor_buffer.resize(total_blocks_to_write * BLOCK_SZ, '\0');
|
||||
for (size_t i = 0; i < xor_buffer.size(); i++) {
|
||||
xor_buffer[i] = static_cast<char>('C' + i / BLOCK_SZ);
|
||||
}
|
||||
size_t xor_offset = 21;
|
||||
std::string source_buffer;
|
||||
source_buffer.resize(total_blocks_to_write * BLOCK_SZ, '\0');
|
||||
ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, source_buffer.data(), source_buffer.size(),
|
||||
size_ + xor_offset),
|
||||
true);
|
||||
for (size_t i = 0; i < xor_buffer.size(); i++) {
|
||||
xor_buffer[i] ^= source_buffer[i];
|
||||
}
|
||||
|
||||
ASSERT_EQ(xor_block_start, source_block);
|
||||
|
||||
ASSERT_TRUE(writer->AddXorBlocks(source_block, xor_buffer.data(), xor_buffer.size(),
|
||||
(size_ / BLOCK_SZ), xor_offset));
|
||||
// Flush operations
|
||||
ASSERT_TRUE(writer->Finalize());
|
||||
|
||||
// Construct the buffer required for validation
|
||||
orig_buffer_ = std::make_unique<uint8_t[]>(total_base_size_);
|
||||
|
||||
// Read the entire base device
|
||||
ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, orig_buffer_.get(), total_base_size_, 0),
|
||||
true);
|
||||
|
||||
// REPLACE ops which are compressed
|
||||
std::memcpy(orig_buffer_.get(), data.data(), data.size());
|
||||
size_t offset = data.size();
|
||||
|
||||
// ZERO ops
|
||||
std::string zero_buffer(total_zero_blocks * BLOCK_SZ, 0);
|
||||
std::memcpy((char*)orig_buffer_.get() + offset, (void*)zero_buffer.c_str(), zero_buffer.size());
|
||||
offset += zero_buffer.size();
|
||||
|
||||
// REPLACE ops - Random buffers which aren't compressed
|
||||
std::memcpy((char*)orig_buffer_.get() + offset, random_buffer.c_str(), random_buffer.size());
|
||||
offset += random_buffer.size();
|
||||
|
||||
// XOR Ops which default to 4k block size compression irrespective of
|
||||
// compression factor
|
||||
ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, (char*)orig_buffer_.get() + offset,
|
||||
xor_buffer.size(), size_ + xor_offset),
|
||||
true);
|
||||
for (size_t i = 0; i < xor_buffer.size(); i++) {
|
||||
orig_buffer_.get()[offset + i] = (uint8_t)(orig_buffer_.get()[offset + i] ^ xor_buffer[i]);
|
||||
}
|
||||
}
|
||||
|
||||
void SnapuserdVariableBlockSizeTest::ReadSnapshotWithVariableBlockSize() {
|
||||
unique_fd fd(open(dmuser_dev_->GetPath().c_str(), O_RDONLY | O_DIRECT));
|
||||
ASSERT_GE(fd, 0);
|
||||
|
||||
void* addr;
|
||||
ssize_t page_size = getpagesize();
|
||||
ASSERT_EQ(posix_memalign(&addr, page_size, size_), 0);
|
||||
std::unique_ptr<void, decltype(&::free)> snapshot_buffer(addr, ::free);
|
||||
|
||||
const TestParam params = GetParam();
|
||||
|
||||
// Issue I/O request with various block sizes
|
||||
size_t num_blocks = size_ / params.block_size;
|
||||
off_t offset = 0;
|
||||
for (size_t i = 0; i < num_blocks; i++) {
|
||||
ASSERT_EQ(ReadFullyAtOffset(fd, (char*)snapshot_buffer.get() + offset, params.block_size,
|
||||
offset),
|
||||
true);
|
||||
offset += params.block_size;
|
||||
}
|
||||
// Validate buffer
|
||||
ASSERT_EQ(memcmp(snapshot_buffer.get(), orig_buffer_.get(), size_), 0);
|
||||
|
||||
// Reset the buffer
|
||||
std::memset(snapshot_buffer.get(), 0, size_);
|
||||
|
||||
// Read one full chunk in a single shot and re-validate.
|
||||
ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), size_, 0), true);
|
||||
ASSERT_EQ(memcmp(snapshot_buffer.get(), orig_buffer_.get(), size_), 0);
|
||||
|
||||
// Reset the buffer
|
||||
std::memset(snapshot_buffer.get(), 0, size_);
|
||||
|
||||
// Buffered I/O test
|
||||
fd.reset(open(dmuser_dev_->GetPath().c_str(), O_RDONLY));
|
||||
ASSERT_GE(fd, 0);
|
||||
|
||||
// Try not to cache
|
||||
posix_fadvise(fd.get(), 0, size_, POSIX_FADV_DONTNEED);
|
||||
|
||||
size_t num_blocks_per_op = (size_ / BLOCK_SZ) / 4;
|
||||
offset = num_blocks_per_op * BLOCK_SZ;
|
||||
size_t read_size = 1019; // bytes
|
||||
offset -= 111;
|
||||
|
||||
// Issue a un-aligned read which crosses the boundary between a REPLACE block and a ZERO
|
||||
// block.
|
||||
ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), read_size, offset), true);
|
||||
|
||||
// Validate the data
|
||||
ASSERT_EQ(std::memcmp(snapshot_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), 0);
|
||||
|
||||
offset = (num_blocks_per_op * 3) * BLOCK_SZ;
|
||||
offset -= (BLOCK_SZ - 119);
|
||||
read_size = 8111;
|
||||
|
||||
// Issue an un-aligned read which crosses the boundary between a REPLACE block of random
|
||||
// un-compressed data and a XOR block
|
||||
ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), read_size, offset), true);
|
||||
|
||||
// Validate the data
|
||||
ASSERT_EQ(std::memcmp(snapshot_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), 0);
|
||||
|
||||
// Reset the buffer
|
||||
std::memset(snapshot_buffer.get(), 0, size_);
|
||||
|
||||
// Read just one byte at an odd offset which is a REPLACE op
|
||||
offset = 19;
|
||||
read_size = 1;
|
||||
ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), read_size, offset), true);
|
||||
// Validate the data
|
||||
ASSERT_EQ(std::memcmp(snapshot_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), 0);
|
||||
|
||||
// Reset the buffer
|
||||
std::memset(snapshot_buffer.get(), 0, size_);
|
||||
|
||||
// Read a block which has no mapping to a COW operation. This read should be
|
||||
// a pass-through to the underlying base device.
|
||||
offset = size_ + 9342;
|
||||
read_size = 30;
|
||||
ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), read_size, offset), true);
|
||||
// Validate the data
|
||||
ASSERT_EQ(std::memcmp(snapshot_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), 0);
|
||||
}
|
||||
|
||||
void SnapuserdVariableBlockSizeTest::SetUp() {
|
||||
ASSERT_NO_FATAL_FAILURE(SnapuserdTest::SetUp());
|
||||
}
|
||||
|
||||
void SnapuserdVariableBlockSizeTest::TearDown() {
|
||||
SnapuserdTest::TearDown();
|
||||
}
|
||||
|
||||
TEST_P(SnapuserdVariableBlockSizeTest, Snapshot_Test_Variable_Block_Size) {
|
||||
if (!harness_->HasUserDevice()) {
|
||||
GTEST_SKIP() << "Skipping snapshot read; not supported";
|
||||
}
|
||||
ASSERT_NO_FATAL_FAILURE(SetupCowV3ForVariableBlockSize());
|
||||
ASSERT_NO_FATAL_FAILURE(ReadSnapshotWithVariableBlockSize());
|
||||
ASSERT_TRUE(StartMerge());
|
||||
CheckMergeCompletion();
|
||||
ValidateMerge();
|
||||
ASSERT_NO_FATAL_FAILURE(ReadSnapshotWithVariableBlockSize());
|
||||
}
|
||||
|
||||
class HandlerTest : public SnapuserdTestBase {
|
||||
protected:
|
||||
void SetUp() override;
|
||||
void TearDown() override;
|
||||
|
||||
void SetUpV2Cow();
|
||||
void InitializeDevice();
|
||||
AssertionResult ReadSectors(sector_t sector, uint64_t size, void* buffer);
|
||||
|
||||
TestBlockServerFactory factory_;
|
||||
|
@ -896,10 +1215,11 @@ class HandlerTest : public SnapuserdTestBase {
|
|||
std::future<bool> handler_thread_;
|
||||
};
|
||||
|
||||
void HandlerTest::SetUp() {
|
||||
ASSERT_NO_FATAL_FAILURE(SnapuserdTestBase::SetUp());
|
||||
ASSERT_NO_FATAL_FAILURE(CreateBaseDevice());
|
||||
void HandlerTest::SetUpV2Cow() {
|
||||
ASSERT_NO_FATAL_FAILURE(CreateCowDevice());
|
||||
}
|
||||
|
||||
void HandlerTest::InitializeDevice() {
|
||||
ASSERT_NO_FATAL_FAILURE(SetDeviceControlName());
|
||||
|
||||
opener_ = factory_.CreateTestOpener(system_device_ctrl_name_);
|
||||
|
@ -921,6 +1241,13 @@ void HandlerTest::SetUp() {
|
|||
handler_thread_ = std::async(std::launch::async, &SnapshotHandler::Start, handler_.get());
|
||||
}
|
||||
|
||||
void HandlerTest::SetUp() {
|
||||
ASSERT_NO_FATAL_FAILURE(SnapuserdTestBase::SetUp());
|
||||
ASSERT_NO_FATAL_FAILURE(CreateBaseDevice());
|
||||
ASSERT_NO_FATAL_FAILURE(SetUpV2Cow());
|
||||
ASSERT_NO_FATAL_FAILURE(InitializeDevice());
|
||||
}
|
||||
|
||||
void HandlerTest::TearDown() {
|
||||
ASSERT_TRUE(factory_.DeleteQueue(system_device_ctrl_name_));
|
||||
ASSERT_TRUE(handler_thread_.get());
|
||||
|
@ -986,6 +1313,147 @@ TEST_P(HandlerTest, ReadUnalignedSize) {
|
|||
ASSERT_EQ(memcmp(snapuserd_buffer.get(), orig_buffer_.get(), SECTOR_SIZE), 0);
|
||||
}
|
||||
|
||||
class HandlerTestV3 : public HandlerTest {
|
||||
public:
|
||||
void ReadSnapshotWithVariableBlockSize();
|
||||
|
||||
protected:
|
||||
void SetUp() override;
|
||||
void TearDown() override;
|
||||
void SetUpV3Cow();
|
||||
};
|
||||
|
||||
void HandlerTestV3::SetUp() {
|
||||
ASSERT_NO_FATAL_FAILURE(SnapuserdTestBase::SetUp());
|
||||
ASSERT_NO_FATAL_FAILURE(CreateBaseDevice());
|
||||
ASSERT_NO_FATAL_FAILURE(SetUpV3Cow());
|
||||
ASSERT_NO_FATAL_FAILURE(InitializeDevice());
|
||||
}
|
||||
|
||||
void HandlerTestV3::TearDown() {
|
||||
ASSERT_NO_FATAL_FAILURE(HandlerTest::TearDown());
|
||||
}
|
||||
|
||||
void HandlerTestV3::SetUpV3Cow() {
|
||||
auto writer = CreateV3Cow();
|
||||
|
||||
size_t total_data_to_write = size_;
|
||||
|
||||
size_t total_blocks_to_write = total_data_to_write / BLOCK_SZ;
|
||||
size_t num_blocks_per_op = total_blocks_to_write / 4;
|
||||
size_t source_block = 0;
|
||||
|
||||
size_t total_replace_blocks = num_blocks_per_op;
|
||||
// Write some data which can be compressed
|
||||
std::string data;
|
||||
data.resize(total_replace_blocks * BLOCK_SZ, '\0');
|
||||
for (size_t i = 0; i < data.size(); i++) {
|
||||
data[i] = static_cast<char>('A' + i / BLOCK_SZ);
|
||||
}
|
||||
// REPLACE ops
|
||||
ASSERT_TRUE(writer->AddRawBlocks(source_block, data.data(), data.size()));
|
||||
|
||||
total_blocks_to_write -= total_replace_blocks;
|
||||
source_block = source_block + total_replace_blocks;
|
||||
|
||||
// ZERO ops
|
||||
size_t total_zero_blocks = total_blocks_to_write / 3;
|
||||
ASSERT_TRUE(writer->AddZeroBlocks(source_block, total_zero_blocks));
|
||||
|
||||
total_blocks_to_write -= total_zero_blocks;
|
||||
source_block = source_block + total_zero_blocks;
|
||||
|
||||
// Generate some random data wherein few blocks cannot be compressed.
|
||||
// This is to test the I/O path for those blocks which aren't compressed.
|
||||
size_t total_random_data_blocks = total_blocks_to_write;
|
||||
unique_fd rnd_fd(open("/dev/random", O_RDONLY));
|
||||
|
||||
ASSERT_GE(rnd_fd, 0);
|
||||
std::string random_buffer;
|
||||
random_buffer.resize(total_random_data_blocks * BLOCK_SZ, '\0');
|
||||
ASSERT_EQ(
|
||||
android::base::ReadFullyAtOffset(rnd_fd, random_buffer.data(), random_buffer.size(), 0),
|
||||
true);
|
||||
// REPLACE ops
|
||||
ASSERT_TRUE(writer->AddRawBlocks(source_block, random_buffer.data(), random_buffer.size()));
|
||||
// Flush operations
|
||||
ASSERT_TRUE(writer->Finalize());
|
||||
|
||||
// Construct the buffer required for validation
|
||||
orig_buffer_ = std::make_unique<uint8_t[]>(total_base_size_);
|
||||
|
||||
// Read the entire base device
|
||||
ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, orig_buffer_.get(), total_base_size_, 0),
|
||||
true);
|
||||
|
||||
// REPLACE ops which are compressed
|
||||
std::memcpy(orig_buffer_.get(), data.data(), data.size());
|
||||
size_t offset = data.size();
|
||||
|
||||
// ZERO ops
|
||||
std::string zero_buffer(total_zero_blocks * BLOCK_SZ, 0);
|
||||
std::memcpy((char*)orig_buffer_.get() + offset, (void*)zero_buffer.c_str(), zero_buffer.size());
|
||||
offset += zero_buffer.size();
|
||||
|
||||
// REPLACE ops - Random buffers which aren't compressed
|
||||
std::memcpy((char*)orig_buffer_.get() + offset, random_buffer.c_str(), random_buffer.size());
|
||||
}
|
||||
|
||||
TEST_P(HandlerTestV3, Read) {
|
||||
std::unique_ptr<uint8_t[]> snapuserd_buffer = std::make_unique<uint8_t[]>(size_);
|
||||
|
||||
size_t read_size = SECTOR_SIZE;
|
||||
off_t offset = 0;
|
||||
// Read the first sector
|
||||
ASSERT_TRUE(ReadSectors(1, read_size, snapuserd_buffer.get()));
|
||||
// Validate the data
|
||||
ASSERT_EQ(std::memcmp(snapuserd_buffer.get(), orig_buffer_.get(), read_size), 0);
|
||||
|
||||
// Read the second block at offset 7680 (Sector 15). This will map to the
|
||||
// first COW operation for variable block size
|
||||
offset += (((BLOCK_SZ * 2) - SECTOR_SIZE));
|
||||
read_size = BLOCK_SZ; // Span across two REPLACE ops
|
||||
ASSERT_TRUE(ReadSectors(offset / SECTOR_SIZE, read_size, snapuserd_buffer.get()));
|
||||
// Validate the data
|
||||
ASSERT_EQ(std::memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + offset, read_size),
|
||||
0);
|
||||
|
||||
// Fill some other data since we are going to read zero blocks
|
||||
std::memset(snapuserd_buffer.get(), 'Z', size_);
|
||||
|
||||
size_t num_blocks_per_op = (size_ / BLOCK_SZ) / 4;
|
||||
offset = num_blocks_per_op * BLOCK_SZ;
|
||||
// Issue read spanning between a REPLACE op and ZERO ops. The starting point
|
||||
// is the last REPLACE op at sector 5118
|
||||
offset -= (SECTOR_SIZE * 2);
|
||||
// This will make sure it falls back to aligned reads after reading the
|
||||
// first unaligned block
|
||||
read_size = BLOCK_SZ * 6;
|
||||
ASSERT_TRUE(ReadSectors(offset / SECTOR_SIZE, read_size, snapuserd_buffer.get()));
|
||||
// Validate the data
|
||||
ASSERT_EQ(std::memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + offset, read_size),
|
||||
0);
|
||||
|
||||
// Issue I/O request at the last block. The first chunk of (SECTOR_SIZE * 2)
|
||||
// will be from REPLACE op which has random buffers
|
||||
offset = (size_ - (SECTOR_SIZE * 2));
|
||||
// Request will span beyond the COW mapping, thereby fetching data from base
|
||||
// device.
|
||||
read_size = BLOCK_SZ * 8;
|
||||
ASSERT_TRUE(ReadSectors(offset / SECTOR_SIZE, read_size, snapuserd_buffer.get()));
|
||||
// Validate the data
|
||||
ASSERT_EQ(std::memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + offset, read_size),
|
||||
0);
|
||||
|
||||
// Issue I/O request which are not mapped to any COW operations
|
||||
offset = (size_ + (SECTOR_SIZE * 3));
|
||||
read_size = BLOCK_SZ * 3;
|
||||
ASSERT_TRUE(ReadSectors(offset / SECTOR_SIZE, read_size, snapuserd_buffer.get()));
|
||||
// Validate the data
|
||||
ASSERT_EQ(std::memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + offset, read_size),
|
||||
0);
|
||||
}
|
||||
|
||||
std::vector<bool> GetIoUringConfigs() {
|
||||
#if __ANDROID__
|
||||
if (!android::base::GetBoolProperty("ro.virtual_ab.io_uring.enabled", false)) {
|
||||
|
@ -1018,6 +1486,37 @@ std::vector<TestParam> GetTestConfigs() {
|
|||
return testParams;
|
||||
}
|
||||
|
||||
std::vector<TestParam> GetVariableBlockTestConfigs() {
|
||||
std::vector<TestParam> testParams;
|
||||
|
||||
std::vector<int> block_sizes = {4096, 8192, 16384, 32768, 65536, 131072};
|
||||
std::vector<std::string> compression_algo = {"none", "lz4", "zstd", "gz"};
|
||||
std::vector<int> threads = {1, 2};
|
||||
std::vector<bool> uring_configs = GetIoUringConfigs();
|
||||
|
||||
// This should test 96 combination and validates the I/O path
|
||||
for (auto block : block_sizes) {
|
||||
for (auto compression : compression_algo) {
|
||||
for (auto thread : threads) {
|
||||
for (auto io_uring : uring_configs) {
|
||||
TestParam param;
|
||||
param.block_size = block;
|
||||
param.compression = compression;
|
||||
param.num_threads = thread;
|
||||
param.io_uring = io_uring;
|
||||
param.o_direct = false;
|
||||
testParams.push_back(std::move(param));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return testParams;
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(Io, SnapuserdVariableBlockSizeTest,
|
||||
::testing::ValuesIn(GetVariableBlockTestConfigs()));
|
||||
INSTANTIATE_TEST_SUITE_P(Io, HandlerTestV3, ::testing::ValuesIn(GetVariableBlockTestConfigs()));
|
||||
INSTANTIATE_TEST_SUITE_P(Io, SnapuserdTest, ::testing::ValuesIn(GetTestConfigs()));
|
||||
INSTANTIATE_TEST_SUITE_P(Io, HandlerTest, ::testing::ValuesIn(GetTestConfigs()));
|
||||
|
||||
|
|
Loading…
Reference in a new issue