diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp index 4df0e76ec..de097f5b0 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp @@ -38,6 +38,7 @@ #include #include #include +#include // The info messages here are spammy, but as useful for update_engine. Disable // them when running on the host. @@ -55,7 +56,7 @@ static_assert(sizeof(off_t) == sizeof(uint64_t)); using android::base::unique_fd; CowWriterV3::CowWriterV3(const CowOptions& options, unique_fd&& fd) - : CowWriterBase(options, std::move(fd)) { + : CowWriterBase(options, std::move(fd)), batch_size_(std::max(options.cluster_ops, 1)) { SetupHeaders(); } @@ -70,6 +71,9 @@ void CowWriterV3::SetupHeaders() { header_.block_size = options_.block_size; header_.num_merge_ops = options_.num_merge_ops; header_.cluster_ops = 0; + if (batch_size_ > 1) { + LOG(INFO) << "Batch writes enabled with batch size of " << batch_size_; + } if (options_.scratch_space) { header_.buffer_size = BUFFER_REGION_DEFAULT_SIZE; } @@ -258,40 +262,49 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t } op.data_length = header_.block_size; } - return WriteOperation({ops.data(), ops.size()}, - {reinterpret_cast(data), size}); + return WriteOperation({ops.data(), ops.size()}, data, size); } - const auto saved_op_count = header_.op_count; - const auto saved_data_pos = next_data_pos_; - for (size_t i = 0; i < num_blocks; i++) { - const uint8_t* const iter = - reinterpret_cast(data) + (header_.block_size * i); + for (size_t i = 0; i < num_blocks; i += batch_size_) { + const auto blocks_to_write = std::min(batch_size_, num_blocks - i); + std::vector> compressed_blocks(blocks_to_write); + std::vector ops(blocks_to_write); + std::vector vec(blocks_to_write); + size_t compressed_bytes = 0; + for (size_t j = 0; j < blocks_to_write; j++) { + const uint8_t* const iter = + reinterpret_cast(data) + (header_.block_size * (i + j)); - CowOperation op{}; - op.new_block = new_block_start + i; + CowOperation& op = ops[j]; + op.new_block = new_block_start + i + j; - op.set_type(type); - if (type == kCowXorOp) { - op.set_source((old_block + i) * header_.block_size + offset); - } else { - op.set_source(next_data_pos_); + op.set_type(type); + if (type == kCowXorOp) { + op.set_source((old_block + i + j) * header_.block_size + offset); + } else { + op.set_source(next_data_pos_ + compressed_bytes); + } + + std::basic_string compressed_data = + compressor_->Compress(iter, header_.block_size); + if (compressed_data.empty()) { + LOG(ERROR) << "Compression failed during EmitBlocks(" << new_block_start << ", " + << num_blocks << ");"; + return false; + } + if (compressed_data.size() >= header_.block_size) { + compressed_data.resize(header_.block_size); + std::memcpy(compressed_data.data(), iter, header_.block_size); + } + compressed_blocks[j] = std::move(compressed_data); + vec[j] = {.iov_base = compressed_blocks[j].data(), + .iov_len = compressed_blocks[j].size()}; + op.data_length = vec[j].iov_len; + compressed_bytes += op.data_length; } - const void* out_data = iter; - - op.data_length = header_.block_size; - - const std::basic_string compressed_data = - compressor_->Compress(out_data, header_.block_size); - if (compressed_data.size() < op.data_length) { - out_data = compressed_data.data(); - op.data_length = compressed_data.size(); - } - if (!WriteOperation(op, out_data, op.data_length)) { + if (!WriteOperation({ops.data(), ops.size()}, {vec.data(), vec.size()})) { PLOG(ERROR) << "AddRawBlocks with compression: write failed. new block: " << new_block_start << " compression: " << compression_.algorithm; - header_.op_count = saved_op_count; - next_data_pos_ = saved_data_pos; return false; } } @@ -299,14 +312,14 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t return true; } -bool CowWriterV3::EmitZeroBlocks(uint64_t new_block_start, uint64_t num_blocks) { +bool CowWriterV3::EmitZeroBlocks(uint64_t new_block_start, const uint64_t num_blocks) { std::vector ops(num_blocks); - for (uint64_t i = 0; i < num_blocks; i++) { - CowOperationV3& op = ops[i]; + for (uint64_t i = 0; i < ops.size(); i++) { + auto& op = ops[i]; op.set_type(kCowZeroOp); op.new_block = new_block_start + i; } - if (!WriteOperation({ops.data(), ops.size()}, {})) { + if (!WriteOperation({ops.data(), ops.size()})) { return false; } return true; @@ -353,8 +366,23 @@ bool CowWriterV3::EmitSequenceData(size_t num_ops, const uint32_t* data) { return true; } +bool CowWriterV3::WriteOperation(std::basic_string_view op, const void* data, + size_t size) { + struct iovec vec { + .iov_len = size + }; + // Dear C++ god, this is effectively a const_cast. I had to do this because + // pwritev()'s struct iovec requires a non-const pointer. The input data + // will not be modified, as the iovec is only used for a write operation. + std::memcpy(&vec.iov_base, &data, sizeof(data)); + return WriteOperation(op, {&vec, 1}); +} + bool CowWriterV3::WriteOperation(std::basic_string_view ops, - std::basic_string_view data) { + std::basic_string_view data) { + const auto total_data_size = + std::transform_reduce(data.begin(), data.end(), 0, std::plus{}, + [](const struct iovec& a) { return a.iov_len; }); if (IsEstimating()) { header_.op_count += ops.size(); if (header_.op_count > header_.op_count_max) { @@ -363,7 +391,7 @@ bool CowWriterV3::WriteOperation(std::basic_string_view ops, next_data_pos_ += (header_.op_count - header_.op_count_max) * sizeof(CowOperationV3); header_.op_count_max = header_.op_count; } - next_data_pos_ += data.size(); + next_data_pos_ += total_data_size; return true; } @@ -379,20 +407,21 @@ bool CowWriterV3::WriteOperation(std::basic_string_view ops, return false; } if (!data.empty()) { - if (!android::base::WriteFullyAtOffset(fd_, data.data(), data.size(), next_data_pos_)) { + const auto ret = pwritev(fd_, data.data(), data.size(), next_data_pos_); + if (ret != total_data_size) { PLOG(ERROR) << "write failed for data of size: " << data.size() - << " at offset: " << next_data_pos_; + << " at offset: " << next_data_pos_ << " " << ret; return false; } } header_.op_count += ops.size(); - next_data_pos_ += data.size(); + next_data_pos_ += total_data_size; return true; } bool CowWriterV3::WriteOperation(const CowOperationV3& op, const void* data, size_t size) { - return WriteOperation({&op, 1}, {reinterpret_cast(data), size}); + return WriteOperation({&op, 1}, data, size); } bool CowWriterV3::Finalize() { diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h index 02b4e611b..93f1d245d 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h +++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h @@ -46,7 +46,9 @@ class CowWriterV3 : public CowWriterBase { bool OpenForWrite(); bool OpenForAppend(uint64_t label); bool WriteOperation(std::basic_string_view op, - std::basic_string_view data); + std::basic_string_view data); + bool WriteOperation(std::basic_string_view op, const void* data = nullptr, + size_t size = 0); bool WriteOperation(const CowOperationV3& op, const void* data = nullptr, size_t size = 0); bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block, uint16_t offset, CowOperationType type); @@ -68,6 +70,7 @@ class CowWriterV3 : public CowWriterBase { // in the case that we are using one thread for compression, we can store and re-use the same // compressor int num_compress_threads_ = 1; + size_t batch_size_ = 0; }; } // namespace snapshot