Adding struct to hold compresion parameters

Since we're adding compression levels should consolidate this
information into one structure.  Adding in CowCompression struct
to hold this information and refactoring code to work off this struct

Test: ota
Change-Id: I969a3ae19ec80fd964bcfb76b39f42f8dd31a56d
This commit is contained in:
Daniel Zheng 2023-08-08 08:51:48 -07:00
parent 1cf0e90409
commit 1cb36d300e
4 changed files with 16 additions and 11 deletions

View file

@ -161,6 +161,10 @@ enum CowCompressionAlgorithm : uint8_t {
kCowCompressLz4 = 3,
kCowCompressZstd = 4,
};
struct CowCompression {
CowCompressionAlgorithm algorithm = kCowCompressNone;
uint32_t compression_level = 0;
};
static constexpr uint8_t kCowReadAheadNotStarted = 0;
static constexpr uint8_t kCowReadAheadInProgress = 1;

View file

@ -472,9 +472,10 @@ TEST_P(CompressionTest, HorribleStream) {
if (strcmp(GetParam(), "none") == 0) {
GTEST_SKIP();
}
CowCompression compression;
auto algorithm = CompressionAlgorithmFromString(GetParam());
ASSERT_TRUE(algorithm.has_value());
compression.algorithm = algorithm.value();
std::string expected = "The quick brown fox jumps over the lazy dog.";
expected.resize(4096, '\0');

View file

@ -124,7 +124,7 @@ bool CowWriterV2::ParseOptions() {
LOG(ERROR) << "unrecognized compression: " << options_.compression;
return false;
}
compression_ = *algorithm;
compression_.algorithm = *algorithm;
if (options_.cluster_ops == 1) {
LOG(ERROR) << "Clusters must contain at least two operations to function.";
@ -165,7 +165,7 @@ void CowWriterV2::InitWorkers() {
return;
}
for (int i = 0; i < num_compress_threads_; i++) {
auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size);
auto wt = std::make_unique<CompressWorker>(compression_.algorithm, header_.block_size);
threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
compress_threads_.push_back(std::move(wt));
}
@ -320,8 +320,8 @@ bool CowWriterV2::CompressBlocks(size_t num_blocks, const void* data) {
const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
compressed_buf_.clear();
if (num_threads <= 1) {
return CompressWorker::CompressBlocks(compression_, options_.block_size, data, num_blocks,
&compressed_buf_);
return CompressWorker::CompressBlocks(compression_.algorithm, options_.block_size, data,
num_blocks, &compressed_buf_);
}
// Submit the blocks per thread. The retrieval of
@ -366,7 +366,7 @@ bool CowWriterV2::EmitBlocks(uint64_t new_block_start, const void* data, size_t
while (num_blocks) {
size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks));
if (compression_ && num_compress_threads_ > 1) {
if (compression_.algorithm && num_compress_threads_ > 1) {
if (!CompressBlocks(pending_blocks, iter)) {
return false;
}
@ -386,19 +386,19 @@ bool CowWriterV2::EmitBlocks(uint64_t new_block_start, const void* data, size_t
op.source = next_data_pos_;
}
if (compression_) {
if (compression_.algorithm) {
auto data = [&, this]() {
if (num_compress_threads_ > 1) {
auto data = std::move(*buf_iter_);
buf_iter_++;
return data;
} else {
auto data =
CompressWorker::Compress(compression_, iter, header_.block_size);
auto data = CompressWorker::Compress(compression_.algorithm, iter,
header_.block_size);
return data;
}
}();
op.compression = compression_;
op.compression = compression_.algorithm;
op.data_length = static_cast<uint16_t>(data.size());
if (!WriteOperation(op, data.data(), data.size())) {

View file

@ -63,7 +63,7 @@ class CowWriterV2 : public CowWriterBase {
private:
CowFooter footer_{};
CowCompressionAlgorithm compression_ = kCowCompressNone;
CowCompression compression_;
uint64_t current_op_pos_ = 0;
uint64_t next_op_pos_ = 0;
uint64_t next_data_pos_ = 0;